rhmessaging commits: r4051 - store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-06-25 07:59:53 -0400 (Fri, 25 Jun 2010)
New Revision: 4051
Modified:
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Output correct count for the Content visitor, update language for results in general
Modified: store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-06-25 09:44:25 UTC (rev 4050)
+++ store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-06-25 11:59:53 UTC (rev 4051)
@@ -408,7 +408,7 @@
//Migrate _queueBindingsDb;
- _logger.info("QueueBindings");
+ _logger.info("Queue Bindings");
moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding");
//Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those
@@ -537,12 +537,12 @@
};
_oldMessageStore.visitContentDb(contentVisitor);
- logCount(metaDataVisitor.getVisitedCount(), "Message ContentChunk");
+ logCount(contentVisitor.getVisitedCount(), "Message Content");
//Migrate _deliveryDb;
_logger.info("Delivery Records");
- moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery Record");
+ moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery");
}
/**
@@ -553,7 +553,7 @@
*/
private void logCount(int count, String item)
{
- _logger.info(" " + count + " " + item + " " + (count == 1 ? "entry" : "entries"));
+ _logger.info(" " + count + " " + item + " " + (count == 1 ? "record" : "records"));
}
/**
14 years, 6 months
rhmessaging commits: r4050 - in store/trunk/java/bdbstore/src: main/java/org/apache/qpid/server/store/berkeleydb/records and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-06-25 05:44:25 -0400 (Fri, 25 Jun 2010)
New Revision: 4050
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Initial work on v2->v3 BDBMessageStore upgrade tool
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-24 20:48:55 UTC (rev 4049)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-25 09:44:25 UTC (rev 4050)
@@ -1043,14 +1043,28 @@
_log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
}
+ QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
+ queue.getOwner(), queue.isExclusive(), arguments);
+
+ createQueue(queueRecord);
+ }
+
+ /**
+ * Makes the specified queue persistent.
+ *
+ * Only intended for direct use during store upgrades.
+ *
+ * @param queueRecord Details of the queue to store.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ protected void createQueue(QueueRecord queueRecord) throws AMQException
+ {
if (_state != State.RECOVERING)
{
- QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
- queue.getOwner(), queue.isExclusive(), arguments);
-
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(queue.getNameShortString(), key);
+ keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
@@ -1062,7 +1076,8 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
+ throw new AMQException("Error writing AMQQueue with name " +
+ queueRecord.getNameShortString().toString() + " to database: " + e, e);
}
}
}
@@ -1477,14 +1492,9 @@
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQException
- {
- final int limit = offset + dst.remaining();
-
+ {
DatabaseEntry contentKeyEntry = new DatabaseEntry();
- //TODO: if requested offset is non zero, use partial record (key-only) search to
- //locate the first record key to prevent reading in data we dont need
-
//Start from 0 offset and search for the starting chunk.
MessageContentKey_3 mck = new MessageContentKey_3(messageId, 0);
TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_3();
@@ -1505,7 +1515,7 @@
{
cursor = _messageContentDb.openCursor(null, null);
- OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+ OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
while (status == OperationStatus.SUCCESS)
{
mck = (MessageContentKey_3) contentKeyTupleBinding.entryToObject(contentKeyEntry);
@@ -1524,7 +1534,6 @@
seenSoFar += size;
- //TODO: can change this guard if we start recording the last byte in the chunk record
if(seenSoFar >= offset)
{
byte[] dataAsBytes = buf.array();
@@ -1600,6 +1609,11 @@
{
return _bindingTupleBindingFactory;
}
+
+ protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory()
+ {
+ return _metaDataTupleBindingFactory;
+ }
//Package getters for the various databases used by the Store
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-06-24 20:48:55 UTC (rev 4049)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-06-25 09:44:25 UTC (rev 4050)
@@ -52,6 +52,11 @@
{
return _exclusive;
}
+
+ public void setExclusive(boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
public FieldTable getArguments()
{
Modified: store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-06-24 20:48:55 UTC (rev 4049)
+++ store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-06-25 09:44:25 UTC (rev 4050)
@@ -22,15 +22,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_1;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_3;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_1;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_3;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
import org.apache.qpid.util.FileUtils;
import org.apache.commons.cli.PosixParser;
@@ -39,13 +42,14 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.File;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -53,21 +57,15 @@
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.ByteBinding;
/**
- * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V1 Store to a V2 Store.
+ * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V2 Store to a V3 Store.
*
- * NOTE: No checks are in place to validate that the input is V1.
+ * Currently upgrade is fixed from v2 -> v3
*
- * Currently upgrade is fixed from v1 -> v2
- * Only the Queue and Binding databases are migrated all other databases are copied as DB entries.
- *
* Improvments:
* - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added.
- * - Add a version value into the store so that a quick check can be performed to perform the upgrades.
* - A version in the store would allow automated upgrade or later with more available versions interactive upgrade.
- * - Currently only the Queue and Binding DB are processed for upgrade all the other db data is copied between stores.
* - Add process logging and disable all Store and Qpid logging.
*/
public class BDBStoreUpgrade
@@ -84,10 +82,6 @@
BDBMessageStore _oldMessageStore;
/** The New Store */
BDBMessageStore _newMessageStore;
- /** A VHost used in the migration of the queues from the old VHost */
- VirtualHost _newVirtualHost;
- /** A VHost used in the creation of the queues from the old store */
- VirtualHost _oldVirtualHost;
/** The file ending that is used by BDB Store Files */
private static final String BDB_FILE_ENDING = ".jdb";
@@ -96,7 +90,7 @@
private boolean _interactive;
private boolean _force;
- private static final String VERSION = "1.0";
+ private static final String VERSION = "2.0";
private static final String OPTION_INPUT_SHORT = "i";
private static final String OPTION_INPUT = "input";
private static final String OPTION_OUTPUT_SHORT = "o";
@@ -312,7 +306,7 @@
if (!userInteract("Are you sure wish to proceed with DB migration without backup? " +
"(For more details of the consequences check the Qpid/BDB Message Store Wiki)."))
{
- throw new IllegalArgumentException("Upgrade stopped as user request as no DB Backup performed.");
+ throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed.");
}
}
}
@@ -341,13 +335,9 @@
CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
- // Note the name of the Vhosts is not important, the store doesnot record the name of the vhost.
- _newVirtualHost = new VirtualHostImpl(new VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new MemoryMessageStore());
- _oldVirtualHost = new VirtualHostImpl(new VirtualHostConfiguration("Old", new PropertiesConfiguration()), new MemoryMessageStore());
-
//Create a new messageStore
_newMessageStore = new BDBMessageStore();
- //TODO _newMessageStore.configure(_newVirtualHost, toDir, false);
+ _newMessageStore.configure(toDir, false);
try
{
@@ -355,25 +345,19 @@
switch (version)
{
default:
- case 1:
- _oldMessageStore = new BDBMessageStore(1);
- //TODO _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+ case 2:
+ _oldMessageStore = new BDBMessageStore(2);
+ _oldMessageStore.configure(fromDir, true);
_oldMessageStore.start();
- upgradeFromVersion_1();
+ upgradeFromVersion_2();
break;
}
}
finally
{
- _newVirtualHost.close();
- _oldVirtualHost.close();
-
_newMessageStore.close();
_oldMessageStore.close();
- //Shutdown the AR that the Vhosts will have created.
- ApplicationRegistry.remove(1);
-
// if we are running inplace then swap fromDir and toDir
if (inplace)
{
@@ -395,84 +379,170 @@
}
}
- private void upgradeFromVersion_1() throws AMQException, DatabaseException
+ private void upgradeFromVersion_2() throws AMQException, DatabaseException
{
+ _logger.info("Starting store upgrade from version 2");
+
+ //Migrate _exchangeDb;
+ _logger.info("Exchanges");
- _logger.info("Starting store upgrade from version 1");
+ moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
- _logger.info("Message Metadata");
- //Migrate _messageMetaDataDb;
- moveContents(_oldMessageStore.getMetaDataDb(), _newMessageStore.getMetaDataDb(), "Message MetaData");
+ final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
+ final TupleBinding exchangeTB = new ExchangeTB();
+
+ DatabaseVisitor exchangeListVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value);
+ AMQShortString type = exchangeRec.getType();
- _logger.info("Message Contents");
- //Migrate _messageContentDb;
- moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb(), "Message Content");
+ if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type))
+ {
+ topicExchanges.add(exchangeRec.getNameShortString());
+ }
+ }
+ };
+ _oldMessageStore.visitExchanges(exchangeListVisitor);
+
+ //Migrate _queueBindingsDb;
+ _logger.info("QueueBindings");
+ moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding");
+
+ //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those
+ //which have a colon in their name and are bound to the Topic exchanges above
+ final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>();
+ final TupleBinding bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
+
+ DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key);
+ AMQShortString queueName = bindingRec.getQueueName();
+ AMQShortString exchangeName = bindingRec.getExchangeName();
+
+ if (topicExchanges.contains(exchangeName) && queueName.asString().contains(":"))
+ {
+ durableSubQueues.add(queueName);
+ }
+ }
+ };
+ _oldMessageStore.visitBindings(durSubQueueListVisitor);
+
+
+ //Migrate _queueDb;
_logger.info("Queues");
- //Migrate _queueDb;
- //Get the oldMessageStore Tuple Binding which does the parsing
+
final TupleBinding queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
- //Create a visitor that will take the queues in the oldMessageStore and add them to the newMessageStore
DatabaseVisitor queueVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQException
{
- AMQQueue queue = (AMQQueue) queueTupleBinding.entryToObject(value);
+ QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value);
+ AMQShortString queueName = queueRec.getNameShortString();
- //The simple call to createQueue with the AMQQueue object is sufficient for a v1 upgrade as all the
- // extra properties in v2 will be defaulted.
- _newMessageStore.createQueue(queue);
+ //if the queue name is in the gathered list then set its exclusivity true
+ if (durableSubQueues.contains(queueName))
+ {
+ _logger.info("Marking as possible DurableSubscription backing queue: " + queueName);
+ queueRec.setExclusive(true);
+ }
+
+ //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as
+ //the extra 'exclusive' property in v3 will be defaulted to false in the record creation.
+ _newMessageStore.createQueue(queueRec);
- // We need to call queue stop here as all the queues were already registerd when the _oldMessageStore
- // state was recovered. Now we are creating a second Queue it will aquire the Executor Service again!
- // But the queueRegistry is a set so only one release will be performed.
- //
- // queue.stop();
- //
- // An alternative approach was taken here: If we don't recover the store
_count++;
}
};
- //Perform the visit
_oldMessageStore.visitQueues(queueVisitor);
logCount(queueVisitor.getVisitedCount(), "queue");
- _logger.info("Delivery Records");
- //Migrate _deliveryDb;
- moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery Record");
- _logger.info("Exchanges");
- //Migrate _exchangeDb;
- moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
+ //Migrate _messageMetaDataDb;
+ _logger.info("Message MetaData");
+
+ final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
+ final TupleBinding oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
+ final TupleBinding newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance();
+
+ DatabaseVisitor metaDataVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value);
- _logger.info("QueueBindings");
- //Migrate _queueBindingsDb;
- final TupleBinding bindingTupleBinding = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
+ DatabaseEntry newValue = new DatabaseEntry();
+ newMetaDataTupleBinding.objectToEntry(metaData, newValue);
+
+ newMetaDataDB.put(null, key, newValue);
- //Create a visitor that to read the old format queue bindings
- DatabaseVisitor queueBindings = new DatabaseVisitor()
+ _count++;
+ }
+ };
+ _oldMessageStore.visitMetaDataDb(metaDataVisitor);
+
+ logCount(metaDataVisitor.getVisitedCount(), "Message MetaData");
+
+
+ //Migrate _messageContentDb;
+ _logger.info("Message Contents");
+ final Database newContentDB = _newMessageStore.getContentDb();
+
+ final TupleBinding oldContentKeyTupleBinding = new MessageContentKeyTB_1();
+ final TupleBinding newContentKeyTupleBinding = new MessageContentKeyTB_3();
+ final TupleBinding contentTB = new ContentTB();
+
+ DatabaseVisitor contentVisitor = new DatabaseVisitor()
{
+ long _prevMsgId = -1; //Initialise to invalid value
+ int _bytesSeenSoFar = 0;
+
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
- BindingKey queueBinding = (BindingKey) bindingTupleBinding.entryToObject(key);
+ //determine the msgId of the current entry
+ MessageContentKey_1 contentKey = (MessageContentKey_1) oldContentKeyTupleBinding.entryToObject(key);
+ long msgId = contentKey.getMessageId();
- //Create a new Format TupleBinding
- TupleBinding newBindingTupleBinding = _newMessageStore.getBindingTupleBindingFactory().getInstance();
+ //if this is a new message, restart the byte offset count.
+ if(_prevMsgId != msgId)
+ {
+ _bytesSeenSoFar = 0;
+ }
- DatabaseEntry newKey = new DatabaseEntry();
- newBindingTupleBinding.objectToEntry(queueBinding, newKey);
+ //determine the content size
+ ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value);
+ int contentSize = content.limit();
- ByteBinding.byteToEntry((byte) 0, value);
- _newMessageStore.getBindingsDb().put(null, newKey, value);
+ //create the new key: id + previously seen data count
+ MessageContentKey_3 newKey = new MessageContentKey_3(msgId, _bytesSeenSoFar);
+ DatabaseEntry newKeyEntry = new DatabaseEntry();
+ newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry);
+ DatabaseEntry newValueEntry = new DatabaseEntry();
+ contentTB.objectToEntry(content, newValueEntry);
+
+ newContentDB.put(null, newKeyEntry, newValueEntry);
+
+ _prevMsgId = msgId;
+ _bytesSeenSoFar += contentSize;
+
_count++;
}
};
+ _oldMessageStore.visitContentDb(contentVisitor);
- _oldMessageStore.visitBindings(queueBindings);
- logCount(queueBindings.getVisitedCount(), "queue binding");
+ logCount(metaDataVisitor.getVisitedCount(), "Message ContentChunk");
+
+
+ //Migrate _deliveryDb;
+ _logger.info("Delivery Records");
+ moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery Record");
}
/**
@@ -713,7 +783,7 @@
}
catch (RuntimeException re)
{
- if (!re.getMessage().equals("User aborted process"))
+ if (!("User aborted process").equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());
@@ -762,7 +832,7 @@
_logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
try
{
- new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(1);
+ new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(2);
_logger.info("Upgrade complete.");
}
@@ -772,9 +842,9 @@
}
catch (DatabaseException de)
{
- if (de.getMessage().endsWith("Error: Unable to load BDBStore as version 1. Store on disk contains version 2 data."))
+ if (de.getMessage().endsWith("Error: Unable to load BDBStore as version 2. Store on disk contains version 3 data."))
{
- System.out.println("Store '" + fromDir + "' has already been upgraded to version 2.");
+ System.out.println("Store '" + fromDir + "' has already been upgraded to version 3.");
}
else
{
@@ -784,7 +854,7 @@
}
catch (RuntimeException re)
{
- if (!re.getMessage().equals("User aborted process"))
+ if (!("User aborted process").equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());
14 years, 6 months
rhmessaging commits: r4049 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-06-24 16:48:55 -0400 (Thu, 24 Jun 2010)
New Revision: 4049
Modified:
mgmt/newdata/cumin/python/cumin/model.py
Log:
Use _qmf_agent_id as key for submission_job_summaries store since the rosemary object will change every update interval
Modified: mgmt/newdata/cumin/python/cumin/model.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/model.py 2010-06-24 19:18:52 UTC (rev 4048)
+++ mgmt/newdata/cumin/python/cumin/model.py 2010-06-24 20:48:55 UTC (rev 4049)
@@ -93,12 +93,12 @@
try:
try:
- store = self.job_summaries_by_submission[submission]
+ store = self.job_summaries_by_submission[submission._qmf_agent_id]
except KeyError:
store = SubmissionJobSummaryStore(self, submission)
store.start_updates()
- self.job_summaries_by_submission[submission] = store
+ self.job_summaries_by_submission[submission._qmf_agent_id] = store
for i in range(5):
if store.data:
14 years, 6 months
rhmessaging commits: r4048 - mgmt/newdata/cumin/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-24 15:18:52 -0400 (Thu, 24 Jun 2010)
New Revision: 4048
Modified:
mgmt/newdata/cumin/bin/cumin-smoke-test
Log:
Improve submit test case
Modified: mgmt/newdata/cumin/bin/cumin-smoke-test
===================================================================
--- mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-24 13:34:15 UTC (rev 4047)
+++ mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-24 19:18:52 UTC (rev 4048)
@@ -50,6 +50,8 @@
try:
test(cumin, cursor)
except:
+ print_exc()
+
print "FAILES!"
try:
@@ -75,14 +77,16 @@
cls = cumin.model.com_redhat_grid.Scheduler
obj = cls.get_object(cursor)
- print "Submitting to", obj,
+ assert obj
+ print "Submitting to", obj.Name,
+
ad = {"Cmd": "/bin/sleep",
"Args": "5m",
"JobUniverse": 5,
"Requirements": "TRUE",
"Iwd": "/tmp",
- "Owner": "gridmonkey@%s" % obj.Machine,
+ "Owner": "gridmonkey",
"!!descriptors": {"Requirements": "com.redhat.grid.Expression"}}
completed = Event()
14 years, 6 months
rhmessaging commits: r4047 - mgmt/newdata/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-24 09:34:15 -0400 (Thu, 24 Jun 2010)
New Revision: 4047
Modified:
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
Log:
* Move get_object to MintAgent; add a delete_object there
* Set a max queue size for the update thread; this obviates the yield
logic that was present before
* Use just one cursor, and pipe it through to functions where needed
* Use the incoming qmf update time to reckon whether a sample is too
fidelitous
* A more performant method for dispatching to value transform
functions
* Move model agent delete to after the AgentDelete update is
completed
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-06-21 21:51:13 UTC (rev 4046)
+++ mgmt/newdata/mint/python/mint/model.py 2010-06-24 13:34:15 UTC (rev 4047)
@@ -78,5 +78,28 @@
self.model = None
+ def get_object(self, cursor, cls, object_id):
+ try:
+ return self.objects_by_id[object_id]
+ except KeyError:
+ obj = RosemaryObject(cls, None)
+ obj._qmf_agent_id = self.id
+ obj._qmf_object_id = object_id
+
+ try:
+ cls.load_object_by_qmf_id(cursor, obj)
+ except RosemaryNotFound:
+ obj._id = cls.get_new_id(cursor)
+
+ self.objects_by_id[object_id] = obj
+
+ return obj
+
+ def delete_object(self, object_id):
+ try:
+ del self.objects_by_id[object_id]
+ except KeyError:
+ pass
+
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.id)
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-06-21 21:51:13 UTC (rev 4046)
+++ mgmt/newdata/mint/python/mint/session.py 2010-06-24 13:34:15 UTC (rev 4047)
@@ -78,8 +78,6 @@
except KeyError:
return
- agent.delete()
-
if not self.model.app.update_thread.isAlive():
return
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-06-21 21:51:13 UTC (rev 4046)
+++ mgmt/newdata/mint/python/mint/update.py 2010-06-24 13:34:15 UTC (rev 4047)
@@ -16,35 +16,25 @@
def __init__(self, app):
super(UpdateThread, self).__init__(app)
- self.updates = ConcurrentQueue()
+ self.updates = ConcurrentQueue(maxsize=1000)
self.stats = UpdateStats(self.app)
self.conn = None
- self.read_cursor = None
- self.write_cursor = None
+ self.cursor = None
self.halt_on_error = False
def init(self):
self.conn = self.app.database.get_connection()
- self.read_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
- self.write_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+ self.cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+ self.cursor.stats = self.stats
- self.read_cursor.stats = self.stats
- self.write_cursor.stats = self.stats
-
def enqueue(self, update):
self.updates.put(update)
self.stats.enqueued += 1
- # This is an attempt to yield from the enqueueing thread (this
- # method's caller) to the update thread
-
- if self.updates.qsize() > 1000:
- sleep(0)
-
def run(self):
while True:
if self.stop_requested:
@@ -152,7 +142,7 @@
log.debug("Processing %s", self)
try:
- self.do_process(thread.write_cursor, thread.stats)
+ self.do_process(thread.cursor, thread.stats)
thread.conn.commit()
except UpdateException, e:
@@ -185,8 +175,7 @@
self.object = obj
def do_process(self, cursor, stats):
- cls = self.get_class()
- obj = self.get_object(cls, self.object.getObjectId().objectName)
+ cls, obj = self.get_object(cursor)
update_time, create_time, delete_time = self.object.getTimestamps()
update_time = datetime.fromtimestamp(update_time / 1000000000)
@@ -200,7 +189,7 @@
# We don't have this object yet
stats.dropped += 1; return
- if stats.enqueued - stats.dequeued > 1000:
+ if stats.enqueued - stats.dequeued > 500:
if update_time < now - minutes_ago:
# The sample is too old
stats.dropped += 1; return
@@ -215,7 +204,7 @@
sample_columns = list()
self.process_headers(obj, object_columns)
- self.process_properties(obj, object_columns)
+ self.process_properties(obj, object_columns, cursor)
self.process_statistics(obj, object_columns, sample_columns)
statements = list()
@@ -246,7 +235,7 @@
statements.append(sql)
stats.sampled += 1
- obj._sample_time = now
+ obj._sample_time = update_time
if statements:
text = "; ".join(statements)
@@ -268,7 +257,7 @@
else:
stats.dropped += 1
- def get_class(self):
+ def get_object(self, cursor):
class_key = self.object.getClassKey()
name = class_key.getPackageName()
@@ -284,31 +273,13 @@
cls = pkg._classes_by_lowercase_name[name.lower()]
except KeyError:
raise ClassUnknown(name)
-
- return cls
- def get_object(self, cls, object_id):
- try:
- return self.agent.objects_by_id[object_id]
- except KeyError:
- cursor = self.model.app.update_thread.read_cursor
+ object_id = self.object.getObjectId().objectName
- obj = RosemaryObject(cls, None)
- obj._qmf_agent_id = self.agent.id
- obj._qmf_object_id = object_id
+ obj = self.agent.get_object(cursor, cls, object_id)
+
+ return cls, obj
- #try:
- try:
- cls.load_object_by_qmf_id(cursor, obj)
- except RosemaryNotFound:
- obj._id = cls.get_new_id(cursor)
- #finally:
- # cursor.close()
-
- self.agent.objects_by_id[object_id] = obj
-
- return obj
-
def process_headers(self, obj, columns):
table = obj._class.sql_table
@@ -337,13 +308,14 @@
columns.append(table._qmf_class_key)
columns.append(table._qmf_create_time)
- def process_properties(self, obj, columns):
+ def process_properties(self, obj, columns, cursor):
cls = obj._class
for prop, value in self.object.getProperties():
try:
if prop.type == 10:
- col, nvalue = self.process_reference(cls, prop, value)
+ col, nvalue = self.process_reference \
+ (cls, prop, value, cursor)
else:
col, nvalue = self.process_value(cls, prop, value)
except MappingException, e:
@@ -359,7 +331,7 @@
setattr(obj, col.name, nvalue)
columns.append(col)
- def process_reference(self, cls, prop, value):
+ def process_reference(self, cls, prop, value, cursor):
try:
ref = cls._references_by_name[prop.name]
except KeyError:
@@ -374,9 +346,9 @@
try:
that_id = str(value.objectName)
except:
- raise MappingException("XXX ref isn't an oid")
+ raise MappingException("Reference isn't an oid")
- that = self.get_object(ref.that_cls, that_id)
+ that = self.agent.get_object(cursor, ref.that_cls, that_id)
if not that._sync_time:
msg = "Referenced object %s hasn't appeared yet"
@@ -393,26 +365,10 @@
raise MappingException("Property %s is unknown" % prop)
if value is not None:
- value = self.transform_value(prop, value)
+ value = transform_value(prop, value)
return col, value
- def transform_value(self, attr, value):
- if attr.type == 8: # absTime
- if value == 0:
- value = None
- else:
- value = datetime.fromtimestamp(value / 1000000000)
- # XXX value = TimestampFromTicks(value / 1000000000)
- elif attr.type == 15: # map
- value = pickle.dumps(value)
- elif attr.type == 10: # objId
- value = str(value)
- elif attr.type == 14: # uuid
- value = str(value)
-
- return value
-
def process_statistics(self, obj, update_columns, insert_columns):
for stat, value in self.object.getStatistics():
try:
@@ -423,7 +379,7 @@
continue
if value is not None:
- value = self.transform_value(stat, value)
+ value = transform_value(stat, value)
# XXX hack workaround
if col.name == "MonitorSelfTime":
@@ -450,17 +406,13 @@
class ObjectDelete(ObjectUpdate):
def do_process(self, cursor, stats):
- cls = self.get_class()
- obj = self.get_object(cls, self.object.getObjectId().objectName)
+ cls, obj = self.get_object(cursor)
self.model.print_event(3, "Deleting %s, from %s", obj, self.agent)
obj.delete(cursor)
- try:
- del self.agent.objects_by_id[self.object.getObjectId().objectName]
- except KeyError:
- pass
+ self.agent.delete_object(obj._qmf_object_id)
stats.deleted += 1
@@ -483,6 +435,8 @@
stats.deleted += 1
+ self.agent.delete()
+
class UpdateException(Exception):
def __init__(self, name):
self.name = name
@@ -501,3 +455,23 @@
class MappingException(Exception):
pass
+
+def transform_default(value):
+ return value
+
+def transform_timestamp(value):
+ if value != 0:
+ return datetime.fromtimestamp(value / 1000000000)
+
+def transform_pickle(value):
+ return pickle.dumps(x)
+
+transformers = list([transform_default for x in range(32)])
+
+transformers[8] = transform_timestamp
+transformers[10] = str
+transformers[14] = str
+transformers[15] = transform_pickle
+
+def transform_value(attr, value):
+ return transformers[attr.type](value)
14 years, 6 months
rhmessaging commits: r4046 - mgmt/newdata/cumin/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-21 17:51:13 -0400 (Mon, 21 Jun 2010)
New Revision: 4046
Modified:
mgmt/newdata/cumin/bin/cumin-smoke-test
Log:
Make cumin-smoke-test an entry point for multiple, individually selectable smoke tests; add a submit smoke test
Modified: mgmt/newdata/cumin/bin/cumin-smoke-test
===================================================================
--- mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-21 19:15:25 UTC (rev 4045)
+++ mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-21 21:51:13 UTC (rev 4046)
@@ -44,28 +44,70 @@
conn = cumin.database.get_connection()
cursor = conn.cursor()
- # cls = cumin.model.org_apache_qpid_broker.Broker XXX fails
- cls = cumin.model.com_redhat_grid.Submission
- obj = cls.get_object(cursor)
+ def invoke_test(test):
+ print "[%s]" % name[6:]
- print "Calling method on", obj
+ try:
+ test(cumin, cursor)
+ except:
+ print "FAILES!"
- summs = cumin.model.get_submission_job_summaries(obj)
+ try:
+ chosen = sys.argv[1]
+ except IndexError:
+ chosen = None
- pprint(summs)
+ attrs = globals()
- #completed = Event()
-
- #def completion(x, y):
- # print x, y
- # completed.set()
+ if chosen:
+ name = "_test_%s" % chosen
- #cumin.session.call_method(completion, obj, "GetJobSummaries", ())
+ assert name in attrs
- #completed.wait()
+ invoke_test(attrs[name])
+ else:
+ for name in [x for x in attrs if x.startswith("_test")]:
+ invoke_test(attrs[name])
finally:
cumin.stop()
+def _test_submit(cumin, cursor):
+ cls = cumin.model.com_redhat_grid.Scheduler
+ obj = cls.get_object(cursor)
+
+ print "Submitting to", obj,
+
+ ad = {"Cmd": "/bin/sleep",
+ "Args": "5m",
+ "JobUniverse": 5,
+ "Requirements": "TRUE",
+ "Iwd": "/tmp",
+ "Owner": "gridmonkey@%s" % obj.Machine,
+ "!!descriptors": {"Requirements": "com.redhat.grid.Expression"}}
+
+ completed = Event()
+
+ def completion(x, y):
+ print "-> [%s, %s]" % (x, y)
+
+ completed.set()
+
+ cumin.session.call_method(completion, obj, "SubmitJob", (ad,))
+
+ completed.wait(30)
+
+def _test_job_summaries(cumin, cursor):
+ cls = cumin.model.com_redhat_grid.Submission
+ obj = cls.get_object(cursor)
+
+ print "Getting job summaries for", obj,
+
+ summs = cumin.model.get_submission_job_summaries(obj)
+
+ print "-> [%i job summaries]" % len(summs)
+
+ #pprint(summs)
+
if __name__ == "__main__":
try:
main()
14 years, 6 months
rhmessaging commits: r4045 - in mgmt/newdata: mint/python/mint and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-21 15:15:25 -0400 (Mon, 21 Jun 2010)
New Revision: 4045
Modified:
mgmt/newdata/cumin/bin/cumin-data
mgmt/newdata/mint/python/mint/main.py
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
Log:
Add an event printing option to cumin-data
Modified: mgmt/newdata/cumin/bin/cumin-data
===================================================================
--- mgmt/newdata/cumin/bin/cumin-data 2010-06-21 15:24:54 UTC (rev 4044)
+++ mgmt/newdata/cumin/bin/cumin-data 2010-06-21 19:15:25 UTC (rev 4045)
@@ -16,6 +16,7 @@
parser = CuminOptionParser(values.data)
parser.add_option("--print-stats", action="store_true")
+ parser.add_option("--print-events", type="int", default=0, metavar="LEVEL")
opts, args = parser.parse_args()
@@ -25,6 +26,8 @@
mint = Mint(model_dir, opts.broker, opts.database)
+ mint.print_event_level = opts.print_events
+
mint.check()
mint.init()
@@ -41,7 +44,7 @@
print "[Starred columns are the number of events per second]"
while True:
- if count % 24 == 0:
+ if count % 20 == 0:
stats.print_headings()
count += 1
Modified: mgmt/newdata/mint/python/mint/main.py
===================================================================
--- mgmt/newdata/mint/python/mint/main.py 2010-06-21 15:24:54 UTC (rev 4044)
+++ mgmt/newdata/mint/python/mint/main.py 2010-06-21 19:15:25 UTC (rev 4045)
@@ -28,6 +28,8 @@
self.vacuum_enabled = True
self.vacuum_thread = VacuumThread(self)
+ self.print_event_level = 0
+
def check(self):
log.info("Checking %s", self)
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-06-21 15:24:54 UTC (rev 4044)
+++ mgmt/newdata/mint/python/mint/model.py 2010-06-21 19:15:25 UTC (rev 4045)
@@ -42,6 +42,12 @@
finally:
self.lock.release()
+ def print_event(self, level, message, *args):
+ log.info(message, *args)
+
+ if self.app.print_event_level >= level:
+ print datetime.now(), message % args
+
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.model_dir)
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-06-21 15:24:54 UTC (rev 4044)
+++ mgmt/newdata/mint/python/mint/session.py 2010-06-21 19:15:25 UTC (rev 4045)
@@ -54,23 +54,24 @@
self.model = model
def brokerConnected(self, qmf_broker):
- log.info("Broker at %s:%i is connected",
- qmf_broker.host, qmf_broker.port)
+ message = "Broker %s:%i is connected"
+ self.model.print_event(1, message, qmf_broker.host, qmf_broker.port)
def brokerInfo(self, qmf_broker):
- log.info("Broker info from %s", qmf_broker)
+ message = "Broker info from %s:%i"
+ self.model.print_event(1, message, qmf_broker.host, qmf_broker.port)
def brokerDisconnected(self, qmf_broker):
- log.info("Broker at %s:%i is disconnected",
- qmf_broker.host, qmf_broker.port)
+ message = "Broker %s:%i is disconnected"
+ self.model.print_event(1, message, qmf_broker.host, qmf_broker.port)
def newAgent(self, qmf_agent):
- log.info("Creating %s", qmf_agent)
+ self.model.print_event(3, "Creating %s", qmf_agent)
MintAgent(self.model, qmf_agent)
def delAgent(self, qmf_agent):
- log.info("Deleting %s", qmf_agent)
+ self.model.print_event(3, "Deleting %s", qmf_agent)
try:
agent = self.model.get_agent(qmf_agent)
@@ -79,11 +80,16 @@
agent.delete()
- if self.model.app.update_thread.isAlive():
- up = AgentDelete(self.model, agent)
- self.model.app.update_thread.enqueue(up)
+ if not self.model.app.update_thread.isAlive():
+ return
+ up = AgentDelete(self.model, agent)
+ self.model.app.update_thread.enqueue(up)
+
def heartbeat(self, qmf_agent, timestamp):
+ message = "Heartbeat from %s at %s"
+ self.model.print_event(5, message, qmf_agent, timestamp)
+
timestamp = timestamp / 1000000000
try:
@@ -94,10 +100,10 @@
agent.last_heartbeat = datetime.fromtimestamp(timestamp)
def newPackage(self, name):
- log.info("New package %s", name)
+ self.model.print_event(2, "New package %s", name)
def newClass(self, kind, classKey):
- log.info("New class %s", classKey)
+ self.model.print_event(2, "New class %s", classKey)
def objectProps(self, broker, obj):
agent = self.model.get_agent(obj.getAgent())
@@ -122,12 +128,12 @@
self.model.app.update_thread.enqueue(up)
def event(self, broker, event):
- """ Invoked when an event is raised. """
- pass
+ self.model.print_event(4, "New event %s from %s", broker, event)
def methodResponse(self, broker, seq, response):
- log.info("Method response for request %i received from %s",
- seq, broker)
+ message = "Method response for request %i received from %s"
+ self.model.print_event(3, message, seq, broker)
+
log.debug("Response: %s", response)
self.model.lock.acquire()
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-06-21 15:24:54 UTC (rev 4044)
+++ mgmt/newdata/mint/python/mint/update.py 2010-06-21 19:15:25 UTC (rev 4045)
@@ -147,7 +147,6 @@
class Update(object):
def __init__(self, model):
self.model = model
- self.thread = None
def process(self, thread):
log.debug("Processing %s", self)
@@ -227,10 +226,15 @@
if obj._sync_time:
sql = cls.sql_update.emit(object_columns)
stats.updated += 1
+
+ self.model.print_event(4, "Updating %s", obj)
else:
sql = cls.sql_insert.emit(object_columns)
stats.created += 1
+ message = "Creating %s, from %s"
+ self.model.print_event(3, message, obj, self.agent)
+
statements.append(sql)
obj._sync_time = now
@@ -449,6 +453,8 @@
cls = self.get_class()
obj = self.get_object(cls, self.object.getObjectId().objectName)
+ self.model.print_event(3, "Deleting %s, from %s", obj, self.agent)
+
obj.delete(cursor)
try:
@@ -470,8 +476,13 @@
for pkg in self.model._packages:
for cls in pkg._classes:
for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+ message = "Deleting %s, from %s"
+ self.model.print_event(3, message, obj, self.agent)
+
obj.delete(cursor)
+ stats.deleted += 1
+
class UpdateException(Exception):
def __init__(self, name):
self.name = name
14 years, 6 months
rhmessaging commits: r4044 - mgmt/newdata/cumin/python/cumin/grid.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-06-21 11:24:54 -0400 (Mon, 21 Jun 2010)
New Revision: 4044
Modified:
mgmt/newdata/cumin/python/cumin/grid/negotiator.py
Log:
Wait for previous qmf calls to respond before call Reconfig
Modified: mgmt/newdata/cumin/python/cumin/grid/negotiator.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/grid/negotiator.py 2010-06-21 15:20:31 UTC (rev 4043)
+++ mgmt/newdata/cumin/python/cumin/grid/negotiator.py 2010-06-21 15:24:54 UTC (rev 4044)
@@ -907,12 +907,14 @@
self.app.main_page.main.grid.pool.negotiator.view.show(session)
def do_invoke(self, invoc, negotiator, group, value):
- self.qmf_call(invoc, negotiator, "SetRawConfig", group, value)
+ # don't call self.qmf_call here since we need to
+ # wait for the response before making the next QMF call
+ action = QmfCall(self.app)
+ action.execute(negotiator, "SetRawConfig", group, value)
def reconfig(self, negotiator):
action = QmfCall(self.app)
- #TODO: put this back when setrawconfig is fixed
- #action.execute(negotiator, "Reconfig")
+ action.execute(negotiator, "Reconfig")
class NegotiatorGroupAdd(NegotiatorGroupTask):
def __init__(self, app, frame):
14 years, 6 months
rhmessaging commits: r4043 - mgmt/newdata/wooly/python/wooly.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-21 11:20:31 -0400 (Mon, 21 Jun 2010)
New Revision: 4043
Modified:
mgmt/newdata/wooly/python/wooly/__init__.py
mgmt/newdata/wooly/python/wooly/server.py
Log:
A simple session logging facility, for debugging page crashes
Modified: mgmt/newdata/wooly/python/wooly/__init__.py
===================================================================
--- mgmt/newdata/wooly/python/wooly/__init__.py 2010-06-21 14:41:11 UTC (rev 4042)
+++ mgmt/newdata/wooly/python/wooly/__init__.py 2010-06-21 15:20:31 UTC (rev 4043)
@@ -525,12 +525,17 @@
self.cookies_by_name = dict() # name => (newly set?, value, expires)
self.headers_by_name = dict()
+ self.messages = list()
+
def branch(self):
session = Session(self.page)
session.trunk = self
return session
+ def log(self, message):
+ self.messages.append(message)
+
def get_cookie(self, name):
try:
return self.cookies_by_name[name][1]
Modified: mgmt/newdata/wooly/python/wooly/server.py
===================================================================
--- mgmt/newdata/wooly/python/wooly/server.py 2010-06-21 14:41:11 UTC (rev 4042)
+++ mgmt/newdata/wooly/python/wooly/server.py 2010-06-21 15:20:31 UTC (rev 4043)
@@ -136,12 +136,12 @@
if profile:
writer.write("Widget trace:\n\n")
-
profile.print_stack_trace(writer)
writer.write("\n")
+ self.print_messages(session, writer)
+ self.print_session(session, writer)
self.print_url_vars(env["QUERY_STRING"], writer)
- self.print_session(session, writer)
self.print_environment(env, writer)
return writer.to_string()
@@ -154,16 +154,12 @@
return (content,)
- def print_url_vars(self, query, writer):
- writer.write("URL variables:\n\n")
+ def print_messages(self, session, writer):
+ writer.write("Messages:\n\n")
- if query:
- vars = query.split(";")
+ for message in session.messages:
+ writer.write(" %s\n" % message)
- for var in sorted(vars):
- key, value = var.split("=")
- writer.write(" %-30s %s\n" % (key, value))
-
writer.write("\n")
def print_session(self, session, writer):
@@ -176,6 +172,18 @@
writer.write("\n")
+ def print_url_vars(self, query, writer):
+ writer.write("URL variables:\n\n")
+
+ if query:
+ vars = query.split(";")
+
+ for var in sorted(vars):
+ key, value = var.split("=")
+ writer.write(" %-30s %s\n" % (key, value))
+
+ writer.write("\n")
+
def print_environment(self, env, writer):
writer.write("Environment:\n\n")
14 years, 6 months
rhmessaging commits: r4042 - in mgmt/newdata/cumin: python/cumin and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-21 10:41:11 -0400 (Mon, 21 Jun 2010)
New Revision: 4042
Modified:
mgmt/newdata/cumin/bin/cumin-smoke-test
mgmt/newdata/cumin/python/cumin/model.py
Log:
Restore job summary background update code; demonstrate its use in cumin-smoke-test
Modified: mgmt/newdata/cumin/bin/cumin-smoke-test
===================================================================
--- mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-18 17:47:08 UTC (rev 4041)
+++ mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-21 14:41:11 UTC (rev 4042)
@@ -38,25 +38,31 @@
cumin.start()
+ sleep(2)
+
try:
conn = cumin.database.get_connection()
cursor = conn.cursor()
# cls = cumin.model.org_apache_qpid_broker.Broker XXX fails
- cls = cumin.model.com_redhat_grid.Scheduler
+ cls = cumin.model.com_redhat_grid.Submission
obj = cls.get_object(cursor)
- print "Calling echo on", obj
+ print "Calling method on", obj
- completed = Event()
+ summs = cumin.model.get_submission_job_summaries(obj)
+
+ pprint(summs)
+
+ #completed = Event()
- def completion(x, y):
- print x, y
- completed.set()
+ #def completion(x, y):
+ # print x, y
+ # completed.set()
- cumin.session.call_method(completion, obj, "echo", (1, "Hello!"))
+ #cumin.session.call_method(completion, obj, "GetJobSummaries", ())
- completed.wait()
+ #completed.wait()
finally:
cumin.stop()
Modified: mgmt/newdata/cumin/python/cumin/model.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/model.py 2010-06-18 17:47:08 UTC (rev 4041)
+++ mgmt/newdata/cumin/python/cumin/model.py 2010-06-21 14:41:11 UTC (rev 4042)
@@ -29,8 +29,10 @@
self.task_invocations = list()
self.limits_by_negotiator = dict()
- self.jobs_by_submission = dict()
+ self.job_summaries_by_submission = dict()
+ self.lock = Lock()
+
def check(self):
log.info("Checking %s", self)
@@ -84,22 +86,26 @@
finally:
self.lock.release()
- def get_submission_jobs(self, submission):
+ def get_submission_job_summaries(self, submission):
assert submission
self.lock.acquire()
try:
try:
- store = self.jobs_by_submission[submission]
+ store = self.job_summaries_by_submission[submission]
except KeyError:
- store = SubmissionJobStore(self, submission)
+ store = SubmissionJobSummaryStore(self, submission)
store.start_updates()
- self.jobs_by_submission[submission] = store
+ self.job_summaries_by_submission[submission] = store
- sleep(1)
+ for i in range(5):
+ if store.data:
+ break
+ sleep(1)
+
return store.data
finally:
self.lock.release()
@@ -1959,14 +1965,20 @@
self.setDaemon(True)
def run(self):
- for i in range(20):
- try:
- self.store.update()
- except Exception, e:
- log.exception(e)
+ conn = self.store.model.app.database.get_connection()
+ cursor = conn.cursor()
- sleep(30)
+ try:
+ for i in range(20):
+ try:
+ self.store.update(cursor)
+ except Exception, e:
+ log.exception(e)
+ sleep(30)
+ finally:
+ conn.close()
+
self.store.delete()
class UpdateTimedOut(Exception):
@@ -1978,7 +1990,7 @@
self.negotiator = negotiator
- def update(self):
+ def update(self, cursor):
def completion(status, data):
self.data = data["Limits"]
@@ -1989,21 +2001,20 @@
super(NegotiatorLimitStore, self).delete()
-class SubmissionJobStore(ObjectStore):
+class SubmissionJobSummaryStore(ObjectStore):
def __init__(self, model, submission):
- super(SubmissionJobStore, self).__init__(model)
+ super(SubmissionJobSummaryStore, self).__init__(model)
self.submission = submission
- def update(self):
+ def update(self, cursor):
def completion(status, data):
self.data = data["Jobs"]
- scheduler = self.submission.scheduler
+ self.model.app.session.call_method \
+ (completion, self.submission, "GetJobSummaries", ())
- scheduler.GetJobs(completion, self.submission.Name, None)
-
def delete(self):
- del self.model.jobs_by_submission[self.submission]
+ del self.model.job_summaries_by_submission[self.submission]
- super(SubmissionJobStore, self).delete()
+ super(SubmissionJobSummaryStore, self).delete()
14 years, 6 months