[jboss-cvs] JBoss Messaging SVN: r1960 - in branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742: tests/src/org/jboss/test/messaging/jms and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 11 12:27:24 EST 2007
Author: clebert.suconic at jboss.com
Date: 2007-01-11 12:27:19 -0500 (Thu, 11 Jan 2007)
New Revision: 1960
Modified:
branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/QueueTest.java
branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/server/plugin/JDBCChannelMapperTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-742 - fix
Modified: branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
===================================================================
--- branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java 2007-01-11 17:08:16 UTC (rev 1959)
+++ branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java 2007-01-11 17:27:19 UTC (rev 1960)
@@ -1,6 +1,6 @@
/*
* JBoss, the OpenSource J2EE webOS
- *
+ *
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
@@ -70,100 +70,100 @@
public class JDBCChannelMapper extends ServiceMBeanSupport implements ChannelMapper
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(JDBCChannelMapper.class);
-
+
protected static final char TYPE_QUEUE = 'Q';
-
+
protected static final char TYPE_TOPIC = 'T';
-
+
protected static final char TYPE_DURABLE_SUB = 'D';
-
+
//========================================
-
+
//FIXME - user-role table management should be handled by a different service
//It doesn't belong here
-
+
private String createUserTable =
"CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128),"
+ " PRIMARY KEY(USERID))";
-
+
private String createRoleTable = "CREATE TABLE JMS_ROLE (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL,"
+ " PRIMARY KEY(USERID, ROLEID))";
-
- private String selectPreConfClientId =
+
+ private String selectPreConfClientId =
"SELECT CLIENTID FROM JMS_USER WHERE USERID=?";
-
-
+
+
// =============================================================================
-
- private String createMappingTable =
+
+ private String createMappingTable =
"CREATE TABLE JMS_CHANNEL_MAPPING (ID BIGINT, TYPE CHAR(1), " +
"JMS_DEST_NAME VARCHAR(1024), JMS_SUB_NAME VARCHAR(1024), " +
"CLIENT_ID VARCHAR(128), SELECTOR VARCHAR(1024), NO_LOCAL CHAR(1), PRIMARY KEY(ID))";
-
- private String insertMapping =
+
+ private String insertMapping =
"INSERT INTO JMS_CHANNEL_MAPPING (ID, TYPE, JMS_DEST_NAME, JMS_SUB_NAME, CLIENT_ID, SELECTOR, NO_LOCAL) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)";
-
+
private String deleteMapping =
"DELETE FROM JMS_CHANNEL_MAPPING WHERE ID = ?";
-
- private String selectIdForDestination =
+
+ private String selectIdForDestination =
"SELECT ID FROM JMS_CHANNEL_MAPPING WHERE TYPE=? AND JMS_DEST_NAME=?";
-
- private String selectDurableSub =
+
+ private String selectDurableSub =
"SELECT JMS_DEST_NAME, ID, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE CLIENT_ID=? AND JMS_SUB_NAME=?";
-
- private String selectSubscriptionsForTopic =
+
+ private String selectSubscriptionsForTopic =
"SELECT ID, CLIENT_ID, JMS_SUB_NAME, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE TYPE='D' AND J" +
"MS_DEST_NAME=?";
-
+
// Static --------------------------------------------------------
-
+
// Attributes ----------------------------------------------------
-
+
// Map<clientID - Map<subscriptionName - CoreDurableSubscription>>
protected Map subscriptions;
-
+
// Map<name - Queue>
protected Map queues;
-
+
// Map<name - Topic>
protected Map topics;
-
+
// Map<id - JBossDestination>
protected Map idMap;
-
+
protected String dataSourceJNDIName;
protected DataSource ds;
protected ObjectName tmObjectName;
protected IdManager channelIDManager;
-
+
protected boolean createTablesOnStartup = true;
protected Properties sqlProperties;
protected List populateTables;
-
+
protected QueuedExecutorPool queuedExecutorPool;
// Constructors --------------------------------------------------
-
+
public JDBCChannelMapper()
{
subscriptions = new ConcurrentReaderHashMap();
-
+
queues = new ConcurrentReaderHashMap();
-
+
topics = new ConcurrentReaderHashMap();
-
+
idMap = new ConcurrentReaderHashMap();
-
+
sqlProperties = new Properties();
-
+
populateTables = new ArrayList();
}
-
+
/*
* Only to be used by tests to create an instance
*/
@@ -177,23 +177,23 @@
}
//Injection
-
+
//TODO
//Since channel mapper requires knowledge of the persistence manager anyway so it can get ids for new channels
//there seems to me no reason why channel mapper should handle it's own persistence
//IMHO all persistence should be handled by the pm
public void setPersistenceManager(PersistenceManager pm) throws Exception
- {
+ {
this.channelIDManager = new IdManager("CHANNEL_ID", 10, pm);
}
-
+
public void setQueuedExecutorPool(QueuedExecutorPool pool) throws Exception
{
this.queuedExecutorPool = pool;
}
-
+
// ServiceMBeanSupport overrides ---------------------------------
-
+
protected void startService() throws Exception
{
try
@@ -204,67 +204,67 @@
ds = (DataSource)ic.lookup(dataSourceJNDIName);
ic.close();
}
-
+
if (ds == null)
{
throw new IllegalStateException("No DataSource found. This service dependencies must " +
"have not been enforced correctly!");
}
-
+
initSqlProperties();
-
+
if (createTablesOnStartup)
{
createSchema();
}
-
+
log.debug(this + " started");
}
catch (Throwable t)
{
throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
- }
+ }
}
-
+
protected void stopService() throws Exception
{
log.debug(this + " stopped");
}
-
+
// DurableSubscriptionStore implementation ---------------
-
+
public Object getInstance()
{
return this;
}
-
+
public CoreDestination getCoreDestination(JBossDestination jbDest)
{
return getCoreDestinationInternal(jbDest.isQueue(), jbDest.getName());
}
-
+
public JBossDestination getJBossDestination(long coreDestinationId)
{
return (JBossDestination)idMap.get(new Long(coreDestinationId));
- }
-
- public void deployCoreDestination(boolean isQueue,
+ }
+
+ public void deployCoreDestination(boolean isQueue,
String destName,
- MessageStore ms,
+ MessageStore ms,
PersistenceManager pm,
MemoryManager mm,
- int fullSize,
- int pageSize,
+ int fullSize,
+ int pageSize,
int downCacheSize) throws Exception
- {
+ {
if (log.isTraceEnabled()) { log.trace("creating core destination for " + destName); }
-
+
CoreDestination cd = getCoreDestinationInternal(isQueue, destName);
if (cd != null)
{
throw new JMSException("Destination " + destName + " already deployed");
}
-
+
// Might already be in db
long id;
Long l = getIdForDestination(isQueue, destName);
@@ -272,7 +272,7 @@
{
// Not in db - insert a new mapping row
id = this.getNextId();
-
+
insertMappingRow(id, isQueue ? TYPE_QUEUE : TYPE_TOPIC, destName, null, null, null, null);
}
@@ -280,15 +280,30 @@
{
id = l.longValue();
}
-
+
+ // Put in id map too
+
+ JBossDestination jbd ;
+
+ if (isQueue)
+ {
+ jbd = new JBossQueue(destName);
+ }
+ else
+ {
+ jbd = new JBossTopic(destName);
+ }
+
+ idMap.put(new Long(id), jbd);
+
// TODO I am using LocalQueues for the time being, switch to distributed Queues
if (isQueue)
{
// We allocate an executor for the queue from the rotating pool
QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(destName);
-
+
cd = new Queue(id, ms, pm, mm, true, fullSize, pageSize, downCacheSize, executor);
-
+
try
{
// we load the queue with any state it might have in the db
@@ -301,22 +316,22 @@
e2.setLinkedException(e);
throw e2;
}
-
+
queues.put(destName, cd);
}
else
{
// TODO I am using LocalTopics for the time being, switch to distributed Topics
cd = new Topic(id, fullSize, pageSize, downCacheSize);
-
+
topics.put(destName, cd);
-
+
// TODO: The following piece of code may be better placed either in the Topic itself or in
// the DurableSubscriptionStore - I'm not sure it really belongs here
-
+
// Load any durable subscriptions for the Topic
List durableSubs = loadDurableSubscriptionsForTopic(destName, ms, pm, mm);
-
+
Iterator iter = durableSubs.iterator();
while (iter.hasNext())
{
@@ -337,138 +352,123 @@
sub.connect();
}
}
-
- // Put in id map too
-
- JBossDestination jbd ;
-
- if (isQueue)
- {
- jbd = new JBossQueue(destName);
- }
- else
- {
- jbd = new JBossTopic(destName);
- }
-
- idMap.put(new Long(id), jbd);
log.debug("core destination " + cd + " (fullSize=" + fullSize + ", pageSize=" +
pageSize + ", downCacheSize=" + downCacheSize + ") deployed");
}
-
+
public CoreDestination undeployCoreDestination(boolean isQueue, String destName)
throws Exception
{
Map m = isQueue ? queues : topics;
-
+
CoreDestination dest = (CoreDestination)m.remove(destName);
-
+
if (dest != null)
- {
+ {
idMap.remove(new Long(dest.getId()));
-
+
//If topic need to remove durable subs from map too
-
+
if (!isQueue)
{
Topic topic = (Topic)dest;
-
+
Iterator iter = topic.iterator();
-
+
while (iter.hasNext())
{
CoreSubscription sub = (CoreSubscription)iter.next();
-
+
if (sub.isRecoverable())
{
DurableSubscription dursub = (DurableSubscription)sub;
-
+
String clientID = dursub.getClientID();
-
+
String name = dursub.getName();
-
+
removeDurableSubscriptionInMemory(clientID, name);
}
}
}
-
+
}
-
+
return dest;
}
-
-
- public void deployTemporaryCoreDestination(boolean isQueue,
+
+
+ public void deployTemporaryCoreDestination(boolean isQueue,
String destName,
long id,
- MessageStore ms,
+ MessageStore ms,
PersistenceManager pm,
MemoryManager mm,
- int fullSize,
- int pageSize,
+ int fullSize,
+ int pageSize,
int downCacheSize) throws Exception
- {
+ {
if (log.isTraceEnabled()) { log.trace("creating temporary core destination for " + destName); }
-
+
+ //Put in id map too
+
+ JBossDestination jbd ;
+
+ if (isQueue)
+ {
+ jbd = new JBossQueue(destName);
+ }
+ else
+ {
+ jbd = new JBossTopic(destName);
+ }
+
+ idMap.put(new Long(id), jbd);
+
CoreDestination cd = getCoreDestinationInternal(isQueue, destName);
if (cd != null)
{
throw new JMSException("Destination " + destName + " already deployed");
- }
-
+ }
+
if (isQueue)
{
//We allocate an executor for the queue from the rotating pool
QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(destName);
-
- cd = new Queue(id, ms, pm, mm, false, fullSize, pageSize, downCacheSize, executor);
-
+
+ cd = new Queue(id, ms, pm, mm, false, fullSize, pageSize, downCacheSize, executor);
+
queues.put(destName, cd);
}
else
{
cd = new Topic(id, fullSize, pageSize, downCacheSize);
-
- topics.put(destName, cd);
+
+ topics.put(destName, cd);
}
-
- //Put in id map too
-
- JBossDestination jbd ;
-
- if (isQueue)
- {
- jbd = new JBossQueue(destName);
- }
- else
- {
- jbd = new JBossTopic(destName);
- }
-
- idMap.put(new Long(id), jbd);
-
+
log.debug("core destination " + cd + " (fullSize=" + fullSize + ", pageSize=" +
pageSize + ", downCacheSize=" + downCacheSize + ") deployed");
}
-
+
public CoreDestination undeployTemporaryCoreDestination(boolean isQueue, String destName)
throws Exception
{
Map m = isQueue ? queues : topics;
-
+
CoreDestination dest = (CoreDestination)m.remove(destName);
-
+
if (dest != null)
- {
- idMap.remove(new Long(dest.getId()));
+ {
+ idMap.remove(new Long(dest.getId()));
}
-
+
return dest;
}
-
+
public DurableSubscription getDurableSubscription(String clientID,
- String subscriptionName,
+ String subscriptionName,
MessageStore ms,
PersistenceManager pm,
MemoryManager mm)
@@ -476,29 +476,29 @@
{
// look in memory first
DurableSubscription sub = getDurableSubscription(clientID, subscriptionName);
-
+
if (sub != null)
{
return sub;
}
-
+
// now look in the db
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
TransactionWrapper wrap = new TransactionWrapper();
-
+
try
{
conn = ds.getConnection();
-
+
ps = conn.prepareStatement(selectDurableSub);
-
+
ps.setString(1, clientID);
ps.setString(2, subscriptionName);
-
+
boolean exists = false;
-
+
try
{
rs = ps.executeQuery();
@@ -512,29 +512,29 @@
log.trace(s + (rs == null ? " failed!" : (exists ? " returned rows" : " did NOT return rows")));
}
}
-
+
if (exists)
{
String topicName = rs.getString(1);
long id = rs.getLong(2);
String selector = rs.getString(3);
boolean noLocal = rs.getString(4).equals("Y");
-
+
Selector sel = selector == null ? null : new Selector(selector);
-
+
Map subs = (Map)subscriptions.get(clientID);
if (subs == null)
{
subs = new ConcurrentReaderHashMap();
subscriptions.put(clientID, subs);
}
-
+
//The subscription might be in the database, but the Topic which owns the subscription
//might not be deployed.
//In this case the user needs to make sure the Topic is deployed
-
+
Topic topic = (Topic)getCoreDestinationInternal(false, topicName);
-
+
if (topic == null)
{
throw new MessagingJMSException("Unable to get subscription: " + subscriptionName
@@ -542,16 +542,16 @@
+ clientID + " which belongs to topic: " + topicName
+ " since this topic is not currently deployed. Please deploy the topic and try again");
}
-
-
+
+
// create in memory
sub = createDurableSubscriptionInternal(id, topicName, clientID, subscriptionName, sel,
noLocal, ms, pm, mm);
-
+
// load its state
sub.load();
}
-
+
return sub;
}
finally
@@ -569,7 +569,7 @@
conn.close();
}
wrap.end();
- }
+ }
}
public DurableSubscription createDurableSubscription(String topicName,
@@ -582,17 +582,17 @@
MemoryManager mm) throws Exception
{
Selector sel = selector == null ? null : new Selector(selector);
-
+
long id;
// first insert a row in the db
id = this.getNextId();
-
+
insertMappingRow(id, TYPE_DURABLE_SUB, topicName, subscriptionName, clientID,
selector, new Boolean(noLocal));
-
+
return createDurableSubscriptionInternal(id, topicName, clientID, subscriptionName, sel,
- noLocal, ms, pm, mm);
+ noLocal, ms, pm, mm);
}
public Subscription createSubscription(String topicName, String selector, boolean noLocal,
@@ -603,9 +603,9 @@
Selector sel = selector == null ? null : new Selector(selector);
long id = this.getNextId();
-
+
Topic topic = (Topic)getCoreDestinationInternal(false, topicName);
-
+
if (topic == null)
{
throw new javax.jms.IllegalStateException("Topic " + topicName + " is not loaded");
@@ -614,24 +614,24 @@
//We allocate an executor for the subscription from the rotating pool
//Currently all subscriptions for the same topic share the same executor
QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(topicName);
-
+
return new Subscription(id, topic, ms, pm, mm,
topic.getFullSize(), topic.getPageSize(),
topic.getDownCacheSize(), executor, sel, noLocal);
}
-
-
-
+
+
+
public boolean removeDurableSubscription(String clientID, String subscriptionName)
throws Exception
{
DurableSubscription removed = removeDurableSubscriptionInMemory(clientID, subscriptionName);
-
+
if (removed != null)
{
//Now remove from db
deleteMappingRow(removed.getChannelID());
-
+
return true;
}
else
@@ -639,7 +639,7 @@
return false;
}
}
-
+
//FIXME - This doesn't belong here
public String getPreConfiguredClientID(String username) throws Exception
{
@@ -649,24 +649,24 @@
PreparedStatement ps = null;
ResultSet rs = null;
TransactionWrapper wrap = new TransactionWrapper();
-
+
try
{
conn = ds.getConnection();
-
+
ps = conn.prepareStatement(selectPreConfClientId);
-
+
ps.setString(1, username);
-
+
rs = ps.executeQuery();
-
+
String clientID = null;
-
+
if (rs.next())
{
clientID = rs.getString(1);
}
-
+
return clientID;
}
catch (SQLException e)
@@ -700,11 +700,11 @@
throw e2;
}
}
-
-
-
+
+
+
// MBean operations ----------------------------------------------
-
+
public String getSqlProperties()
{
try
@@ -718,25 +718,25 @@
return "";
}
}
-
+
public void setSqlProperties(String value)
{
try
{
-
+
ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
sqlProperties = new Properties();
sqlProperties.load(is);
-
+
}
catch (IOException shouldnothappen)
{
log.error("Caught IOException", shouldnothappen);
}
}
-
+
// Public --------------------------------------------------------
-
+
/**
* Managed attribute.
*/
@@ -744,7 +744,7 @@
{
this.dataSourceJNDIName = dataSourceJNDIName;
}
-
+
/**
* Managed attribute.
*/
@@ -752,7 +752,7 @@
{
return dataSourceJNDIName;
}
-
+
/**
* Managed attribute.
*/
@@ -760,7 +760,7 @@
{
this.tmObjectName = tmObjectName;
}
-
+
/**
* Managed attribute.
*/
@@ -784,7 +784,7 @@
{
createTablesOnStartup = b;
}
-
+
/**
* @return a Set<String>. It may return an empty Set, but never null.
*/
@@ -800,75 +800,75 @@
result.addAll(m.keySet());
return result;
}
-
+
public String toString()
{
return "JDBCChannelMapper[" + Integer.toHexString(hashCode()) + "]";
}
-
+
// Package protected ---------------------------------------------
-
+
// Protected -----------------------------------------------------
-
+
protected DurableSubscription removeDurableSubscriptionInMemory(String clientID, String subscriptionName)
throws JMSException
{
if (log.isTraceEnabled()) { log.trace("removing durable subscription " + subscriptionName); }
-
+
if (clientID == null)
{
throw new JMSException("Client ID must be set for connection!");
}
-
+
Map subs = (Map)subscriptions.get(clientID);
-
+
if (subs == null)
{
return null;
}
-
+
if (log.isTraceEnabled()) { log.trace("removing durable subscription " + subscriptionName); }
-
+
DurableSubscription removed = (DurableSubscription)subs.remove(subscriptionName);
-
+
if (removed == null)
{
return null;
}
-
+
if (subs.size() == 0)
{
subscriptions.remove(clientID);
}
-
+
return removed;
}
-
- protected List loadDurableSubscriptionsForTopic(String topicName,
+
+ protected List loadDurableSubscriptionsForTopic(String topicName,
MessageStore ms,
PersistenceManager pm,
MemoryManager mm) throws Exception
- {
+ {
List result = new ArrayList();
-
+
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
TransactionWrapper wrap = new TransactionWrapper();
int rowCount = -1;
-
+
try
{
conn = ds.getConnection();
ps = conn.prepareStatement(selectSubscriptionsForTopic);
-
+
ps.setString(1, topicName);
-
+
rs = ps.executeQuery();
-
+
rowCount = 0;
-
+
while (rs.next())
{
rowCount ++;
@@ -877,11 +877,11 @@
String subName = rs.getString(3);
String selector = rs.getString(4);
boolean noLocal = rs.getString(5).equals("Y");
-
+
Selector sel = selector == null ? null : new Selector(selector);
-
+
DurableSubscription sub = getDurableSubscription(clientId, subName);
-
+
if (sub == null)
{
sub = createDurableSubscriptionInternal(id, topicName,
@@ -891,7 +891,7 @@
noLocal,
ms, pm, mm);
result.add(sub);
- }
+ }
}
}
catch (SQLException e)
@@ -905,7 +905,7 @@
{
String s = JDBCUtil.statementToString(selectSubscriptionsForTopic, topicName);
log.trace(s + " returned " + rowCount + " rows");
- }
+ }
if (rs != null)
{
rs.close();
@@ -920,12 +920,12 @@
}
wrap.end();
}
-
+
return result;
}
-
+
protected void insertMappingRow(long id, char type, String jmsDestName, String jmsSubName, String clientID,
String selector, Boolean noLocal) throws Exception
{
@@ -933,27 +933,27 @@
PreparedStatement ps = null;
ResultSet rs = null;
TransactionWrapper wrap = new TransactionWrapper();
-
+
boolean failed = true;
-
+
try
{
conn = ds.getConnection();
-
+
ps = conn.prepareStatement(insertMapping);
-
+
ps.setLong(1, id);
-
- ps.setString(2, String.valueOf(type));
-
+
+ ps.setString(2, String.valueOf(type));
+
ps.setString(3, jmsDestName);
-
+
ps.setString(4, jmsSubName);
-
+
ps.setString(5, clientID);
-
+
ps.setString(6, selector);
-
+
if (noLocal == null)
{
ps.setNull(7, Types.VARCHAR);
@@ -962,14 +962,14 @@
{
ps.setString(7, noLocal.booleanValue() ? "Y" : "N");
}
-
+
int rows = ps.executeUpdate();
-
+
failed = rows != 1;
-
+
ps.close();
-
- ps = null;
+
+ ps = null;
}
finally
{
@@ -980,7 +980,7 @@
clientID, selector, noLocal);
log.trace(s + (failed ? " failed!" : " executed successfully"));
}
-
+
if (rs != null)
{
try
@@ -1011,26 +1011,26 @@
wrap.end();
}
}
-
+
protected Long getIdForDestination(boolean isQueue, String destName) throws Exception
{
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
TransactionWrapper wrap = new TransactionWrapper();
-
+
try
{
conn = ds.getConnection();
-
+
ps = conn.prepareStatement(selectIdForDestination);
-
+
ps.setString(1, String.valueOf(isQueue ? TYPE_QUEUE : TYPE_TOPIC));
-
+
ps.setString(2, destName);
-
- rs = ps.executeQuery();
-
+
+ rs = ps.executeQuery();
+
if (rs.next())
{
return new Long(rs.getLong(1));
@@ -1038,7 +1038,7 @@
else
{
return null;
- }
+ }
}
finally
{
@@ -1072,26 +1072,26 @@
wrap.end();
}
}
-
-
+
+
protected boolean deleteMappingRow(long id) throws Exception
{
Connection conn = null;
PreparedStatement ps = null;
TransactionWrapper wrap = new TransactionWrapper();
-
+
boolean failed = true;
-
+
try
{
conn = ds.getConnection();
-
+
ps = conn.prepareStatement(deleteMapping);
-
+
ps.setLong(1, id);
-
- failed = ps.executeUpdate() != -1;
-
+
+ failed = ps.executeUpdate() != -1;
+
return !failed;
}
finally
@@ -1122,7 +1122,7 @@
wrap.end();
}
}
-
+
protected DurableSubscription getDurableSubscription(String clientID,
String subscriptionName) throws JMSException
{
@@ -1147,98 +1147,98 @@
subs = new ConcurrentReaderHashMap();
subscriptions.put(clientID, subs);
}
-
+
Topic topic = (Topic)getCoreDestinationInternal(false, topicName);
-
+
if (topic == null)
{
throw new javax.jms.IllegalStateException("Topic " + topicName + " is not loaded");
}
-
+
// We allocate an executor for the subscription from the rotating pool.
// Currently all subscriptions for the same topic share the same executor.
QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(topicName);
-
+
DurableSubscription subscription =
new DurableSubscription(id, topic, ms, pm, mm,
topic.getFullSize(), topic.getPageSize(), topic.getDownCacheSize(),
executor, selector,
noLocal, subscriptionName, clientID);
-
+
subs.put(subscriptionName, subscription);
-
+
return subscription;
- }
-
+ }
+
protected CoreDestination getCoreDestinationInternal(boolean isQueue, String destName)
{
Map m = isQueue ? queues : topics;
-
+
return (CoreDestination)m.get(destName);
}
-
+
protected void initSqlProperties()
{
createUserTable = sqlProperties.getProperty("CREATE_USER_TABLE", createUserTable);
createRoleTable = sqlProperties.getProperty("CREATE_ROLE_TABLE", createRoleTable);
- selectPreConfClientId = sqlProperties.getProperty("SELECT_PRECONF_CLIENTID", selectPreConfClientId);
+ selectPreConfClientId = sqlProperties.getProperty("SELECT_PRECONF_CLIENTID", selectPreConfClientId);
createMappingTable = sqlProperties.getProperty("CREATE_MAPPING_TABLE", createMappingTable);
insertMapping = sqlProperties.getProperty("INSERT_MAPPING", insertMapping);
deleteMapping = sqlProperties.getProperty("DELETE_MAPPING", deleteMapping);
selectIdForDestination = sqlProperties.getProperty("SELECT_ID_FOR_DESTINATION", selectIdForDestination);
selectDurableSub = sqlProperties.getProperty("SELECT_DURABLE_SUB", selectDurableSub);
selectSubscriptionsForTopic = sqlProperties.getProperty("SELECT_SUBSCRIPTIONS_FOR_TOPIC", selectSubscriptionsForTopic);
-
+
for (Iterator i = sqlProperties.entrySet().iterator(); i.hasNext();)
{
Map.Entry entry = (Map.Entry) i.next();
String key = (String) entry.getKey();
if (key.startsWith("POPULATE.TABLES."))
populateTables.add(entry.getValue());
- }
+ }
}
-
+
protected void createSchema() throws Exception
- {
- Connection conn = null;
+ {
+ Connection conn = null;
TransactionWrapper tx = new TransactionWrapper();
-
+
try
{
conn = ds.getConnection();
-
+
try
{
- if (log.isTraceEnabled()) { log.trace("Creating JMS_USERS table"); }
+ if (log.isTraceEnabled()) { log.trace("Creating JMS_USERS table"); }
conn.createStatement().executeUpdate(createUserTable);
}
- catch (SQLException e)
+ catch (SQLException e)
{
log.debug("Failed to create users table: " + createUserTable, e);
}
-
+
try
{
if (log.isTraceEnabled()) { log.trace("Creating JMS_ROLES table"); }
conn.createStatement().executeUpdate(createRoleTable);
}
- catch (SQLException e)
+ catch (SQLException e)
{
log.debug("Failed to create roles table: " + createRoleTable, e);
}
-
+
try
{
if (log.isTraceEnabled()) { log.trace("Creating JMS_SUBSCRIPTIONS table"); }
conn.createStatement().
executeUpdate(createMappingTable);
}
- catch (SQLException e)
+ catch (SQLException e)
{
log.debug("Failed to create subscriptions table: " + createMappingTable, e);
}
-
-
+
+
//Insert user-role data
Iterator iter = populateTables.iterator();
String nextQry = null;
@@ -1247,8 +1247,8 @@
Statement st = null;
try
{
- nextQry = (String) iter.next();
- st = conn.createStatement();
+ nextQry = (String) iter.next();
+ st = conn.createStatement();
st.executeUpdate(nextQry);
}
catch (SQLException ignored)
@@ -1256,7 +1256,7 @@
log.debug("Error populating tables: " + nextQry, ignored);
}
}
-
+
}
finally
{
@@ -1271,14 +1271,14 @@
}
tx.end();
}
-
+
}
-
+
protected long getNextId() throws Exception
{
return channelIDManager.getId();
}
-
+
// Private -------------------------------------------------------
// never access directly
@@ -1300,23 +1300,23 @@
}
// Inner classes -------------------------------------------------
-
+
/*
* TODO This inner class is duplicated from HSQLPersistenceManager - need to avoid code duplication
*/
class TransactionWrapper
{
private javax.transaction.Transaction oldTx;
-
+
private TransactionWrapper() throws Exception
{
TransactionManager tm = getTransactionManagerReference();
oldTx = tm.suspend();
-
+
tm.begin();
}
-
+
private void end() throws Exception
{
TransactionManager tm = getTransactionManagerReference();
@@ -1340,11 +1340,11 @@
}
}
}
-
+
private void exceptionOccurred() throws Exception
{
getTransactionManagerReference().setRollbackOnly();
}
}
-
+
}
Modified: branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/QueueTest.java
===================================================================
--- branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/QueueTest.java 2007-01-11 17:08:16 UTC (rev 1959)
+++ branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/QueueTest.java 2007-01-11 17:27:19 UTC (rev 1960)
@@ -31,6 +31,7 @@
import javax.naming.InitialContext;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.message.JBossMessage;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -43,39 +44,39 @@
public class QueueTest extends MessagingTestCase
{
// Constants -----------------------------------------------------
-
+
// Static --------------------------------------------------------
-
+
// Attributes ----------------------------------------------------
protected InitialContext ic;
protected ConnectionFactory cf;
// Constructors --------------------------------------------------
-
+
public QueueTest(String name)
{
super(name);
}
-
+
// TestCase overrides -------------------------------------------
-
+
public void setUp() throws Exception
{
- super.setUp();
-
+ super.setUp();
+
ServerManagement.start("all");
-
-
+
+
ic = new InitialContext(ServerManagement.getJNDIEnvironment());
cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
-
+
ServerManagement.undeployQueue("TestQueue");
ServerManagement.deployQueue("TestQueue");
log.debug("setup done");
}
-
+
public void tearDown() throws Exception
{
ServerManagement.undeployQueue("TestQueue");
@@ -83,8 +84,8 @@
log.debug("tear down done");
}
-
-
+
+
// Public --------------------------------------------------------
/**
@@ -117,6 +118,62 @@
}
}
+ /**
+ * The simplest possible queue test.
+ */
+ public void testRedeployQueue() throws Exception
+ {
+ Queue queue = (Queue)ic.lookup("/queue/TestQueue");
+
+ Connection conn = cf.createConnection();
+
+ try
+ {
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = s.createProducer(queue);
+ MessageConsumer c = s.createConsumer(queue);
+ conn.start();
+
+ for (int i = 0; i < 500; i++)
+ {
+ p.send(s.createTextMessage("payload " + i));
+ }
+
+ //ServerManagement.undeployQueue("TestQueue");
+
+ log.info("Stopping server");
+ ServerManagement.stopServerPeer();
+
+ log.info("Starting server");
+ ServerManagement.startServerPeer();
+ ServerManagement.deployQueue("TestQueue");
+
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+ cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ conn = cf.createConnection();
+ s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ p = s.createProducer(queue);
+ c = s.createConsumer(queue);
+ conn.start();
+
+ for (int i = 0; i < 500; i++)
+ {
+ TextMessage message = (TextMessage)c.receive(3000);
+ assertNotNull(message);
+ assertNotNull(message.getJMSDestination());
+ }
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
public void testQueueName() throws Exception
{
Queue queue = (Queue)ic.lookup("/queue/TestQueue");
@@ -124,12 +181,12 @@
}
// Package protected ---------------------------------------------
-
+
// Protected -----------------------------------------------------
-
+
// Private -------------------------------------------------------
-
+
// Inner classes -------------------------------------------------
-
+
}
Modified: branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/server/plugin/JDBCChannelMapperTest.java
===================================================================
--- branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/server/plugin/JDBCChannelMapperTest.java 2007-01-11 17:08:16 UTC (rev 1959)
+++ branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/server/plugin/JDBCChannelMapperTest.java 2007-01-11 17:27:19 UTC (rev 1960)
@@ -23,6 +23,9 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
import javax.naming.InitialContext;
import javax.sql.DataSource;
@@ -35,13 +38,18 @@
import org.jboss.jms.server.DestinationManager;
import org.jboss.jms.server.plugin.contract.ChannelMapper;
import org.jboss.jms.server.subscription.DurableSubscription;
+import org.jboss.jms.message.JBossTextMessage;
import org.jboss.messaging.core.local.CoreDestination;
import org.jboss.messaging.core.local.Queue;
import org.jboss.messaging.core.local.Topic;
import org.jboss.messaging.core.persistence.JDBCUtil;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.Channel;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.core.SimpleChannel;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.tm.TransactionManagerService;
import org.jboss.util.id.GUID;
@@ -50,9 +58,10 @@
* These tests must not be run in remote mode!
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>1.1</tt>
+ * @author <a href="mailto:Galder.Zamarreno at jboss.com">Galder Zamarreno</a>
+ * @version <tt>$Release$</tt>
*
- * JDBCChannelMapperTest.java,v 1.1 2006/02/28 16:48:15 timfox Exp
+ * $Id: JDBCChannelMapperTest.java 1958 2007-01-11 16:55:40Z clebert.suconic at jboss.com $
*/
public class JDBCChannelMapperTest extends MessagingTestCase
{
@@ -87,18 +96,66 @@
ms = ServerManagement.getMessageStore();
pm = ServerManagement.getPersistenceManager();
channelMapper = ServerManagement.getChannelMapper();
-
+
log.debug("setup done");
}
public void tearDown() throws Exception
{
log.debug("starting tear down");
-
+
super.tearDown();
}
+ // I had to place this test as the firs one one the list..
+ // it's dependent on the channelMapper empty..
+ // It won't affect the other tests
+ public void testSendRedeployAndConsume() throws Throwable
+ {
+ /* Deploy 'paradise' as destination queue */
+ channelMapper.deployCoreDestination(true, "paradise", ms, pm, null, 100, 20, 10);
+ /* Deploy 'earth' as queue to reply to */
+ channelMapper.deployCoreDestination(true, "earth", ms, pm, null, 100, 20, 10);
+
+ Channel channel = new SimpleChannel(0, ms);
+ /* Create a message for 'paradise' queue destination */
+ Message[] m = createMessages(1);
+ List refs = new ArrayList();
+ MessageReference ref1 = ms.reference(m[0]);
+
+ ref1.setOrdering(11);
+ refs.add(ref1);
+
+ /* We persist the message */
+ pm.addReferences(channel.getChannelID(), refs, false);
+
+ /* Delete the message from memory so that on redeployment, messaging is forced
+ * to retrieve it from the database */
+ ms.forgetMessage(0);
+
+ log.info("********************** UnDeploying coreDestination");
+ channelMapper.undeployCoreDestination(true, "paradise");
+ channelMapper.undeployCoreDestination(true, "earth");
+
+ log.info("********************** Deploying coreDestination");
+
+ /* We redeploy the target destination which will load the message from the database */
+ channelMapper.deployCoreDestination(true, "paradise", ms, pm, null, 100, 20, 10);
+
+ JBossDestination jd = channelMapper.getJBossDestination(0);
+ Queue q = (Queue) channelMapper.getCoreDestination(jd);
+
+ /* Browse the pending messages */
+ List l = q.browse();
+ JBossTextMessage message = (JBossTextMessage)l.get(0);
+ /* Browse the pending messages. There's only one and its destination should not be
+ * null, in fact, it should be 'paradise' */
+ assertNotNull(message.getJMSDestination());
+ assertEquals("Destination should be 'paradise'","paradise",
+ ((JBossDestination)message.getJMSDestination()).getName());
+ }
+
public void testCreateGetRemoveDurableSubscription() throws Exception
{
String topicName = new GUID().toString();
@@ -214,133 +271,131 @@
assertNotNull(theClientID);
assertEquals(clientID, theClientID);
-
+
}
-
+
public void testGetDeployCoreDestinationTest() throws Exception
{
//Lookup a non existent core destination - verify returns null
-
+
JBossQueue queue = new JBossQueue("queue1");
-
+
JBossTopic topic = new JBossTopic("topic1");
-
+
CoreDestination cd = channelMapper.getCoreDestination(queue);
-
+
assertNull(cd);
-
+
cd = channelMapper.getCoreDestination(topic);
-
+
assertNull(cd);
-
+
//Lookup a non existent jboss destination - verify returns null
-
+
JBossDestination jbd = channelMapper.getJBossDestination(123);
-
+
assertNull(jbd);
-
+
//Deploy core destinations
-
+
channelMapper.deployCoreDestination(true, "queue1", ms, pm, null, 100, 20, 10);
-
+
channelMapper.deployCoreDestination(false, "topic1", ms, pm, null, 100, 20, 10);
-
+
//Lookup core dest
-
+
CoreDestination cd1 = channelMapper.getCoreDestination(queue);
-
+
assertNotNull(cd1);
-
+
assertTrue(cd1 instanceof Queue);
-
+
Queue q = (Queue)cd1;
-
+
CoreDestination cd2 = channelMapper.getCoreDestination(topic);
-
+
assertNotNull(cd2);
-
+
assertTrue(cd2 instanceof Topic);
-
+
Topic t = (Topic)cd2;
-
+
//Lookup jboss dest
-
+
JBossDestination jb1 = channelMapper.getJBossDestination(q.getChannelID());
-
+
assertNotNull(jb1);
-
+
assertEquals("queue1", jb1.getName());
-
+
JBossDestination jb2 = channelMapper.getJBossDestination(t.getId());
-
+
assertNotNull(jb2);
-
+
assertEquals("topic1", jb2.getName());
-
+
//undeploy core dest
-
+
CoreDestination cd3 = channelMapper.undeployCoreDestination(true, "queue1");
-
+
assertNotNull(cd3);
-
+
assertEquals(q.getChannelID(), cd3.getId());
-
+
CoreDestination cd4 = channelMapper.undeployCoreDestination(false, "topic1");
-
+
assertNotNull(cd3);
-
+
assertEquals(t.getId(), cd4.getId());
-
+
cd3 = channelMapper.undeployCoreDestination(true, "queue1");
-
+
assertNull(cd3);
-
+
cd4 = channelMapper.undeployCoreDestination(false, "topic1");
-
+
assertNull(cd4);
-
+
//Lookup core dest - null
-
+
cd3 = channelMapper.getCoreDestination(queue);
-
+
assertNull(cd3);
-
+
cd4 = channelMapper.getCoreDestination(topic);
-
+
assertNull(cd4);
-
+
//lookup jboss dest - null
-
+
jb1 = channelMapper.getJBossDestination(q.getChannelID());
-
+
assertNull(jb1);
-
+
jb2 = channelMapper.getJBossDestination(t.getId());
-
+
assertNull(jb2);
-
+
//Deploy a core dest
-
+
channelMapper.deployCoreDestination(true, "queue1", ms, pm, null, 100, 20, 10);
-
+
channelMapper.deployCoreDestination(false, "topic1", ms, pm, null, 100, 20, 10);
-
+
//lookup core dest - verify has same id
-
+
cd3 = channelMapper.getCoreDestination(queue);
-
+
assertNotNull(cd3);
-
+
assertEquals(cd3.getId(), cd1.getId());
-
+
cd4 = channelMapper.getCoreDestination(topic);
-
+
assertNotNull(cd4);
-
+
assertEquals(cd4.getId(), cd2.getId());
}
-
-
// Package protected ---------------------------------------------
@@ -348,6 +403,35 @@
// Private -------------------------------------------------------
+ private Message[] createMessages(int num) throws Throwable
+ {
+ //Generate some messages with a good range of attribute values
+ Message[] messages = new Message[num];
+ for (int i = 0; i < num; i++)
+ {
+ messages[i] = createMessage((byte)i, true);
+ }
+ return messages;
+ }
+
+ private Message createMessage(byte i, boolean reliable) throws Throwable
+ {
+ return new JBossTextMessage(i,
+ reliable,
+ System.currentTimeMillis() + 1000 * 60 * 60,
+ System.currentTimeMillis(),
+ (byte)(i % 10),
+ new HashMap(),
+ "hello".getBytes(),
+ 0,
+ "msg",
+ "1",
+ "1".getBytes(),
+ new JBossQueue("paradise"),
+ new JBossQueue("earth"),
+ new HashMap());
+ }
+
// Inner classes -------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list