[jboss-cvs] JBoss Messaging SVN: r4203 - in trunk: src/main/org/jboss/messaging/core/server and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu May 15 07:38:09 EDT 2008
Author: timfox
Date: 2008-05-15 07:38:09 -0400 (Thu, 15 May 2008)
New Revision: 4203
Modified:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/util/TypedProperties.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
Log:
Performance improvements also correct TypedProperties size change
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-05-15 11:38:09 UTC (rev 4203)
@@ -23,6 +23,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.mina.common.IoSession;
@@ -30,7 +31,6 @@
import org.apache.mina.common.IoSessionDataStructureFactory;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.WriteRequestQueue;
-import org.apache.mina.util.CircularQueue;
/**
*
@@ -52,7 +52,7 @@
public WriteRequestQueue getWriteRequestQueue(IoSession session)
throws Exception
{
- return new DefaultWriteRequestQueue();
+ return new ConcurrentWriteRequestQueue();
}
@@ -134,38 +134,9 @@
}
- private static class DefaultWriteRequestQueue implements WriteRequestQueue
- {
- private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
-
- public void dispose(IoSession session) {
- }
-
- public void clear(IoSession session) {
- q.clear();
- }
-
- public synchronized boolean isEmpty(IoSession session) {
- return q.isEmpty();
- }
-
- public synchronized void offer(IoSession session, WriteRequest writeRequest) {
- q.offer(writeRequest);
- }
-
- public synchronized WriteRequest poll(IoSession session) {
- return q.poll();
- }
-
- @Override
- public String toString() {
- return q.toString();
- }
- }
-
// private static class DefaultWriteRequestQueue implements WriteRequestQueue
// {
-// private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
+// private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
//
// public void dispose(IoSession session) {
// }
@@ -191,5 +162,34 @@
// return q.toString();
// }
// }
+
+ private static class ConcurrentWriteRequestQueue implements WriteRequestQueue
+ {
+ private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
+
+ public void dispose(IoSession session) {
+ }
+
+ public void clear(IoSession session) {
+ q.clear();
+ }
+ public synchronized boolean isEmpty(IoSession session) {
+ return q.isEmpty();
+ }
+
+ public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+ q.offer(writeRequest);
+ }
+
+ public synchronized WriteRequest poll(IoSession session) {
+ return q.poll();
+ }
+
+ @Override
+ public String toString() {
+ return q.toString();
+ }
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-15 11:38:09 UTC (rev 4203)
@@ -141,7 +141,6 @@
}
}
});
-
}
private final int high = 2000;
@@ -177,10 +176,6 @@
public void acquireSemaphore() throws Exception
{
- // if (!sem.tryAcquire(5000, TimeUnit.MILLISECONDS))
- // {
- // throw new IllegalStateException("Timed out");
- // }
int newcount = count.incrementAndGet();
if (newcount == high)
@@ -214,7 +209,17 @@
{
public void send(Packet p) throws Exception
{
+ try
+ {
+ acquireSemaphore();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to acquire sem", e);
+ }
+
dispatcher.callFilters(p);
+
session.write(p);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-05-15 11:38:09 UTC (rev 4203)
@@ -42,9 +42,9 @@
MessageReference createReference(Queue queue);
- void decrementDurableRefCount();
+ int decrementDurableRefCount();
- void incrementDurableRefCount();
+ int incrementDurableRefCount();
int getDurableRefCount();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-15 11:38:09 UTC (rev 4203)
@@ -43,7 +43,6 @@
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.SimpleString;
@@ -172,6 +171,8 @@
deliver();
}
+ //private volatile int count = 0;
+
/*
* Attempt to deliver all the messages in the queue
*
@@ -219,7 +220,11 @@
{
if (iterator == null)
{
- messageReferences.removeFirst();
+// count++;
+// if (count == 500000)
+// {
+ messageReferences.removeFirst();
+ // }
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-05-15 11:38:09 UTC (rev 4203)
@@ -120,14 +120,14 @@
return durableRefCount.get();
}
- public void decrementDurableRefCount()
+ public int decrementDurableRefCount()
{
- durableRefCount.decrementAndGet();
+ return durableRefCount.decrementAndGet();
}
- public void incrementDurableRefCount()
+ public int incrementDurableRefCount()
{
- durableRefCount.incrementAndGet();
+ return durableRefCount.incrementAndGet();
}
public ServerMessage copy()
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-15 11:38:09 UTC (rev 4203)
@@ -24,11 +24,12 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -55,7 +56,6 @@
import org.jboss.messaging.core.security.CheckType;
import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.server.Delivery;
-import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConnection;
@@ -128,13 +128,15 @@
private final Set<ServerProducer> producers = new ConcurrentHashSet<ServerProducer>();
- private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
+ private final java.util.Queue<Delivery> deliveries = new ConcurrentLinkedQueue<Delivery>();
- private long deliveryIDSequence = 0;
+ private final AtomicLong deliveryIDSequence = new AtomicLong(0);
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Transaction tx;
+
+ private final Object rollbackCancelLock = new Object();
// Constructors
// ---------------------------------------------------------------------------------
@@ -223,12 +225,18 @@
dispatcher.unregister(producer.getID());
}
- public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
+ public void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
{
- Delivery delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), deliveryIDSequence++, sender);
-
- deliveries.add(delivery);
-
+ Delivery delivery;
+ synchronized (rollbackCancelLock)
+ {
+ long nextID = deliveryIDSequence.getAndIncrement();
+
+ delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), nextID, sender);
+
+ deliveries.add(delivery);
+ }
+
delivery.deliver();
}
@@ -324,7 +332,7 @@
}
}
- public synchronized void acknowledge(final long deliveryID, final boolean allUpTo) throws Exception
+ public void acknowledge(final long deliveryID, final boolean allUpTo) throws Exception
{
/*
Note that we do not consider it an error if the deliveries cannot be found to be acked.
@@ -421,7 +429,7 @@
}
// Synchronize to prevent any new deliveries arriving during this recovery.
- synchronized (this)
+ synchronized (rollbackCancelLock)
{
// Add any unacked deliveries into the tx. Doing this ensures all references are rolled back in the correct
// order in a single contiguous block
@@ -433,7 +441,7 @@
deliveries.clear();
- deliveryIDSequence -= tx.getAcknowledgementsCount();
+ deliveryIDSequence.addAndGet(-tx.getAcknowledgementsCount());
}
tx.rollback(queueSettingsRepository);
@@ -449,7 +457,7 @@
Transaction cancelTx;
- synchronized (this)
+ synchronized (rollbackCancelLock)
{
cancelTx = new TransactionImpl(persistenceManager, postOffice);
@@ -1066,19 +1074,16 @@
if (message.isDurable() && queue.isDurable())
{
- synchronized (message)
+ int count = message.decrementDurableRefCount();
+
+ if (count == 0)
{
- message.decrementDurableRefCount();
-
- if (message.getDurableRefCount() == 0)
- {
- persistenceManager.storeDelete(message.getMessageID());
- }
- else
- {
- persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
- }
+ persistenceManager.storeDelete(message.getMessageID());
}
+ else
+ {
+ persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
+ }
}
queue.referenceAcknowledged();
Modified: trunk/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TypedProperties.java 2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/util/TypedProperties.java 2008-05-15 11:38:09 UTC (rev 4203)
@@ -45,7 +45,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.logging.Logger;
@@ -64,8 +63,9 @@
private static final Logger log = Logger.getLogger(TypedProperties.class);
private Map<SimpleString, PropertyValue> properties;
- AtomicInteger size = new AtomicInteger(0);
+ private volatile int size;
+
public TypedProperties()
{
}
@@ -168,7 +168,7 @@
int numHeaders = buffer.getInt();
properties = new HashMap<SimpleString, PropertyValue>(numHeaders);
- size.set(0);
+ size = 0;
for (int i = 0; i < numHeaders; i++)
{
@@ -290,7 +290,7 @@
}
else
{
- return SIZE_BYTE + SIZE_INT + size.intValue();
+ return SIZE_BYTE + SIZE_INT + size;
}
}
@@ -318,11 +318,11 @@
PropertyValue oldValue = properties.put(key, value);
if (oldValue != null)
{
- size.addAndGet(value.encodeSize() - oldValue.encodeSize());
+ size += value.encodeSize() - oldValue.encodeSize();
}
else
{
- size.addAndGet(SimpleString.sizeofString(key) + value.encodeSize());
+ size += SimpleString.sizeofString(key) + value.encodeSize();
}
}
@@ -334,15 +334,15 @@
}
PropertyValue val = properties.remove(key);
-
- size.addAndGet((SimpleString.sizeofString(key) + val.encodeSize()) * -1);
-
+
if (val == null)
{
return null;
}
else
{
+ size -= SimpleString.sizeofString(key) + val.encodeSize();
+
return val.getValue();
}
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2008-05-15 11:38:09 UTC (rev 4203)
@@ -339,8 +339,6 @@
assertTrue(fast.processed == numMessages - 2);
- // Thread.sleep(10000);
-
}
finally
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-15 11:38:09 UTC (rev 4203)
@@ -86,179 +86,6 @@
conn.close();
}
- public static void main(String[] args)
- {
- try
- {
- CoreClientTest test = new CoreClientTest();
-
- test.setUp();
- test.testCoreClientPerf();
- test.tearDown();
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- }
- }
-
- public void testCoreClientPerf() throws Exception
- {
- Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
-
- ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
- cf.setDefaultConsumerWindowSize(-1);
- // cf.setDefaultProducerMaxRate(30000);
-
- ClientConnection conn = cf.createConnection();
-
- final ClientSession session = conn.createClientSession(false, true, true, 1000, false, false);
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
-
- byte[] bytes = new byte[1000];
-
- message.getBody().putBytes(bytes);
-
- message.getBody().flip();
-
-
- ClientConsumer consumer = session.createConsumer(QUEUE);
-
- final CountDownLatch latch = new CountDownLatch(1);
-//
- final int numMessages = 100000000;
-
- class MyHandler implements MessageHandler
- {
- int count;
-
- public void onMessage(ClientMessage msg)
- {
- count++;
-
- try
- {
- session.acknowledge();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
-
- if (count == numMessages)
- {
- latch.countDown();
- }
- }
- }
-
- consumer.setMessageHandler(new MyHandler());
-
-
-
- //System.out.println("Waiting 10 secs");
-
- // Thread.sleep(10000);
-
-
-
- System.out.println("Starting");
-
-
- //Warmup
- for (int i = 0; i < 50000; i++)
- {
- producer.send(message);
- }
-//
-// System.out.println("Waiting 10 secs");
-//
-// Thread.sleep(10000);
-
-
- long start = System.currentTimeMillis();
-
- for (int i = 0; i < numMessages; i++)
- {
- producer.send(message);
- }
-
-
-
-
- //long end = System.currentTimeMillis();
-
- //double actualRate = 1000 * (double)numMessages / ( end - start);
-
- //System.out.println("Send Rate is " + actualRate);
-
-// long end = System.currentTimeMillis();
-//
-// double actualRate = 1000 * (double)numMessages / ( end - start);
-
- // System.out.println("Rate is " + actualRate);
-
- // conn.start();
-
- // start = System.currentTimeMillis();
-
- // latch.await();
-
-// long end = System.currentTimeMillis();
-//
-// double actualRate = 1000 * (double)numMessages / ( end - start);
-//
-// System.out.println("Rate is " + actualRate);
-
- //conn.start();
-
- //System.out.println("Waiting 10 secs");
-
-
-
- long end = System.currentTimeMillis();
-
- double actualRate = 1000 * (double)numMessages / ( end - start);
-
- System.out.println("Rate is " + actualRate);
-
- //Thread.sleep(10000);
-
-
- // conn.start();
-//
-//
-// start = System.currentTimeMillis();
-//
-
-// conn.start();
-////
-// start = System.currentTimeMillis();
-////
-// latch.await();
-////
-////
-// end = System.currentTimeMillis();
-//
-// actualRate = 1000 * (double)numMessages / ( end - start);
-//
-// System.out.println("Rate is " + actualRate);
-
-//
-// message = consumer.receive(1000);
-//
-// assertEquals("testINVMCoreClient", message.getBody().getString());
-//
- conn.close();
- }
-
-
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list