[jboss-user] [JBoss Messaging] - Re: Reliable delivery
vc123
do-not-reply at jboss.com
Tue Feb 12 09:31:03 EST 2008
"ataylor" wrote : anonymous wrote : But, what about non_persistent messages ? In the current JM implementation non_persistent delivery performance seems to be determined by the relational database backend too as I wrote earlier, though it does not suffer from sync writes as much as persistent delivery does.
|
| The only time non persistent messages are written to the database is if paging kicks in. Theres a configurable limit on a destination that controls how many messages can be held in memory. at any point. Are you consuming the messages in your test?
Andy,
Wile we are at it, I substituted Postgres for Oracle since it is more readily available to anyone wishing to try my simple test.
We have two goals with our application: the fastest possible non_persistent delivery and control message persistent delivery with a desired rate of a dozen m/s. While the second goal seems to be satisfiable, the first is a bit tricky.
Here's the sender/receiver code.
| -- Sender
|
| import java.util.Properties;
| import javax.jms.*;
| import javax.naming.*;
|
| class MySender {
| static boolean keepSending = false;
| static int count = 0;
|
| static class Thread1 extends Thread {
| int oldCount= 0;
| public void run() {
| try {
| while(true) {
| sleep(10000);
| // keepSending = false;
| System.out.println("Sent per sec: "+ (count - oldCount)/10);
| oldCount= count;
| }
| } catch (Exception e) {System.out.println(e);}
| }
| }
|
| public static void main(String[] argv) throws Exception
| {
|
| String URL = "jnp://localhost:1099";
| Properties env = new Properties();
| env.put(Context.PROVIDER_URL, URL);
| env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
| InitialContext ctx = new InitialContext(env);
| QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
|
|
| QueueConnection conn = qcf.createQueueConnection();
| QueueSession sess = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
| Queue queue = sess.createQueue("q1");
|
|
| BytesMessage message = sess.createBytesMessage();
| byte[] content = new byte[1024];
| for (int i = 0; i < content.length; i++)
| {
| content = (byte) (i & 0xFF);
| }
| message.writeBytes(content);
|
| QueueSender sender = sess.createSender(queue);
| conn.start();
|
| keepSending=true;
| (new Thread1()).start();
| while (keepSending)
| {
| sender.send(message, javax.jms.DeliveryMode.NON_PERSISTENT, 4, 1000);
| count++;
| }
| System.out.println("Count: "+count);
|
| }
|
| }
|
| -- Receiver
|
| import java.util.Properties;
| import javax.jms.*;
| import javax.naming.*;
|
| class MyReceiver implements MessageListener {
| static int count = 0;
|
| static class Thread1 extends Thread {
| int oldCount= 0;
| public void run() {
| try {
| while(true) {
| sleep(10000);
| System.out.println("Received per sec: "+ (count - oldCount)/10);
| oldCount= count;
| }
| } catch (Exception e) {System.out.println(e);}
| }
| }
|
| public void onMessage(Message message)
| {
| BytesMessage bm = (BytesMessage) message;
| count++;
| }
|
| public static void main(String[] argv) throws Exception
| {
|
| String URL = "jnp://localhost:1099";
| Properties env = new Properties();
| env.put(Context.PROVIDER_URL, URL);
| env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
| InitialContext ctx = new InitialContext(env);
| QueueConnectionFactory qcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
|
|
| QueueConnection conn = qcf.createQueueConnection();
| QueueSession sess = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
| Queue queue = sess.createQueue("q1");
|
| QueueReceiver receiver = sess.createReceiver(queue);
| receiver.setMessageListener(new MyReceiver());
|
| (new Thread1()).start();
| conn.start();
|
| Thread.sleep(3600*10000);
| }
|
| }
|
The sender, receiver and the Messaging ran in three separate VMs. The Messaging ran under Jboss 4.2.2 but the results are similar with 5.0 Beta 4. Every ten second, both S and R print the current message rate.
Here's what happens, see the output:
| --Sender
|
| Sent per sec: 12544
| Sent per sec: 17100
| Sent per sec: 12134
| Sent per sec: 13414
| Sent per sec: 11161
| Sent per sec: 8089
| Sent per sec: 4044
| Sent per sec: 1502
| Sent per sec: 238
| Sent per sec: 68
| Sent per sec: 21
| Sent per sec: 0
|
| -- Receiver
|
| Received per sec: 13879
| Received per sec: 16058
| Received per sec: 9916
| Received per sec: 11360
| Received per sec: 6189
| Received per sec: 3958
| Received per sec: 2104
| Received per sec: 397
| Received per sec: 8
| Received per sec: 0
| Received per sec: 0
| Received per sec: 0
|
|
as you can see, after a short while the consumer cannot consume and the producer cannot produce any more. Upon killing both S and R, the JBoss Messaging dies with this stack:
| 08:57:38,423 ERROR [ServerConsumerEndpoint] Failed to expire delivery: Delivery[Reference[5480029]:NON-RELIABLE]
| java.lang.OutOfMemoryError: GC overhead limit exceeded
| at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:45)
| at java.lang.StringBuilder.<init>(StringBuilder.java:68)
| at org.jboss.jms.destination.JBossQueue.toString(JBossQueue.java:80)
| at org.jboss.jms.server.endpoint.ServerSessionEndpoint.makeCopyForDLQOrExpiry(ServerSessionEndpoint.java:1655)
| at org.jboss.jms.server.endpoint.ServerSessionEndpoint.expireDelivery(ServerSessionEndpoint.java:1085)
| at org.jboss.jms.server.endpoint.ServerConsumerEndpoint.handle(ServerConsumerEndpoint.java:231)
| at org.jboss.messaging.core.impl.RoundRobinDistributor.handle(RoundRobinDistributor.java:119)
| at org.jboss.messaging.core.impl.MessagingQueue$DistributorWrapper.handle(MessagingQueue.java:582)
| at org.jboss.messaging.core.impl.ClusterRoundRobinDistributor.handle(ClusterRoundRobinDistributor.java:79)
| at org.jboss.messaging.core.impl.ChannelSupport.deliverInternal(ChannelSupport.java:606)
| at org.jboss.messaging.core.impl.MessagingQueue.deliverInternal(MessagingQueue.java:505)
| at org.jboss.messaging.core.impl.ChannelSupport.deliver(ChannelSupport.java:356)
| at org.jboss.jms.server.endpoint.ServerSessionEndpoint$2.run(ServerSessionEndpoint.java:1539)
| at EDU.oswego.cs.dl.util.concurrent.QueuedExecutor$RunLoop.run(QueuedExecutor.java:89)
| at java.lang.Thread.run(Thread.java:619)
| 08:57:55,232 ERROR [STDERR] Exception in thread "WorkerThread#1[127.0.0.1:60882]"
|
VJ
View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4128768#4128768
Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4128768
More information about the jboss-user
mailing list