[jboss-user] [JBoss Messaging] - Re: Reliable delivery

vc123 do-not-reply at jboss.com
Tue Feb 12 09:31:03 EST 2008


"ataylor" wrote : anonymous wrote : But, what about non_persistent messages ? In the current JM implementation non_persistent delivery performance seems to be determined by the relational database backend too as I wrote earlier, though it does not suffer from sync writes as much as persistent delivery does. 
  | 
  | The only time non persistent messages are written to the database is if paging kicks in. Theres a configurable limit on a destination that controls how many messages can be held in memory. at any point.  Are you consuming the messages in your test?

Andy,

Wile we are at it,  I substituted Postgres for Oracle since it is more readily available to anyone wishing to try my simple test.  

We have two goals with our application: the fastest possible non_persistent delivery and control message persistent delivery with a desired rate of a dozen m/s.  While the second goal seems to be satisfiable,  the first is a bit tricky.

Here's the sender/receiver code.  


  | -- Sender
  | 
  | import java.util.Properties;
  | import javax.jms.*;
  | import javax.naming.*;
  | 
  | class MySender {
  |     static boolean keepSending = false;
  |     static int count = 0;
  | 
  |     static class Thread1 extends Thread {
  |        int oldCount= 0;
  |        public void run() {
  |            try {
  |             while(true) {
  |              sleep(10000);
  | //             keepSending = false;
  |              System.out.println("Sent per sec: "+ (count - oldCount)/10);
  |              oldCount= count;
  |             }
  |            } catch (Exception e) {System.out.println(e);}
  |        }
  |     }
  | 
  |     public static void main(String[] argv) throws Exception
  |     {
  | 
  |             String URL = "jnp://localhost:1099";
  |             Properties env = new Properties();
  |             env.put(Context.PROVIDER_URL, URL);
  |             env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
  |             InitialContext ctx = new InitialContext(env);
  |             QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
  | 
  | 
  |             QueueConnection conn  = qcf.createQueueConnection();
  |             QueueSession sess = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
  |             Queue queue = sess.createQueue("q1");
  | 
  | 
  |             BytesMessage message = sess.createBytesMessage();
  |             byte[] content = new byte[1024];
  |             for (int i = 0; i < content.length; i++)
  |             {
  |               content = (byte) (i & 0xFF);
  |             }
  |             message.writeBytes(content);
  | 
  |          QueueSender sender = sess.createSender(queue);
  |          conn.start();
  | 
  |          keepSending=true;
  |          (new Thread1()).start();
  |          while (keepSending)
  |          {
  |             sender.send(message, javax.jms.DeliveryMode.NON_PERSISTENT, 4, 1000);
  |             count++;
  |          }
  |          System.out.println("Count: "+count);
  | 
  |     }
  | 
  | }
  | 
  | -- Receiver
  | 
  | import java.util.Properties;
  | import javax.jms.*;
  | import javax.naming.*;
  | 
  | class MyReceiver implements MessageListener {
  |     static int count = 0;
  | 
  |     static class Thread1 extends Thread {
  |        int oldCount= 0;
  |        public void run() {
  |            try {
  |             while(true) {
  |              sleep(10000);
  |              System.out.println("Received per sec: "+ (count - oldCount)/10);
  |              oldCount= count;
  |             }
  |            } catch (Exception e) {System.out.println(e);}
  |        }
  |     }
  | 
  |     public void onMessage(Message message)
  |     {
  |       BytesMessage bm = (BytesMessage) message;
  |       count++;
  |     }
  | 
  |     public static void main(String[] argv) throws Exception
  |     {
  | 
  |             String URL = "jnp://localhost:1099";
  |             Properties env = new Properties();
  |             env.put(Context.PROVIDER_URL, URL);
  |             env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
  |             InitialContext ctx = new InitialContext(env);
  |             QueueConnectionFactory qcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
  | 
  | 
  |             QueueConnection conn  = qcf.createQueueConnection();
  |             QueueSession sess = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
  |             Queue queue = sess.createQueue("q1");
  | 
  |             QueueReceiver receiver  = sess.createReceiver(queue);
  |             receiver.setMessageListener(new MyReceiver());
  | 
  |             (new Thread1()).start();
  |             conn.start();
  | 
  |             Thread.sleep(3600*10000);
  |     }
  | 
  | }
  | 


The sender, receiver and the Messaging ran in three separate VMs.  The Messaging ran under Jboss 4.2.2 but the results are similar with 5.0 Beta 4.  Every ten second,  both S and R print the current message rate.  

Here's what happens,  see the output:


  | --Sender
  | 
  | Sent per sec: 12544
  | Sent per sec: 17100
  | Sent per sec: 12134
  | Sent per sec: 13414
  | Sent per sec: 11161
  | Sent per sec: 8089
  | Sent per sec: 4044
  | Sent per sec: 1502
  | Sent per sec: 238
  | Sent per sec: 68
  | Sent per sec: 21
  | Sent per sec: 0
  | 
  | -- Receiver
  | 
  | Received per sec: 13879
  | Received per sec: 16058
  | Received per sec: 9916
  | Received per sec: 11360
  | Received per sec: 6189
  | Received per sec: 3958
  | Received per sec: 2104
  | Received per sec: 397
  | Received per sec: 8
  | Received per sec: 0
  | Received per sec: 0
  | Received per sec: 0
  | 
  | 

as you can see,  after a short while the consumer cannot consume and the producer cannot produce any more.  Upon killing both S and R,  the JBoss Messaging dies with this stack:

  | 08:57:38,423 ERROR [ServerConsumerEndpoint] Failed to expire delivery: Delivery[Reference[5480029]:NON-RELIABLE]
  | java.lang.OutOfMemoryError: GC overhead limit exceeded
  |         at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:45)
  |         at java.lang.StringBuilder.<init>(StringBuilder.java:68)
  |         at org.jboss.jms.destination.JBossQueue.toString(JBossQueue.java:80)
  |         at org.jboss.jms.server.endpoint.ServerSessionEndpoint.makeCopyForDLQOrExpiry(ServerSessionEndpoint.java:1655)
  |         at org.jboss.jms.server.endpoint.ServerSessionEndpoint.expireDelivery(ServerSessionEndpoint.java:1085)
  |         at org.jboss.jms.server.endpoint.ServerConsumerEndpoint.handle(ServerConsumerEndpoint.java:231)
  |         at org.jboss.messaging.core.impl.RoundRobinDistributor.handle(RoundRobinDistributor.java:119)
  |         at org.jboss.messaging.core.impl.MessagingQueue$DistributorWrapper.handle(MessagingQueue.java:582)
  |         at org.jboss.messaging.core.impl.ClusterRoundRobinDistributor.handle(ClusterRoundRobinDistributor.java:79)
  |         at org.jboss.messaging.core.impl.ChannelSupport.deliverInternal(ChannelSupport.java:606)
  |         at org.jboss.messaging.core.impl.MessagingQueue.deliverInternal(MessagingQueue.java:505)
  |         at org.jboss.messaging.core.impl.ChannelSupport.deliver(ChannelSupport.java:356)
  |         at org.jboss.jms.server.endpoint.ServerSessionEndpoint$2.run(ServerSessionEndpoint.java:1539)
  |         at EDU.oswego.cs.dl.util.concurrent.QueuedExecutor$RunLoop.run(QueuedExecutor.java:89)
  |         at java.lang.Thread.run(Thread.java:619)
  | 08:57:55,232 ERROR [STDERR] Exception in thread "WorkerThread#1[127.0.0.1:60882]"
  | 

VJ

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4128768#4128768

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4128768



More information about the jboss-user mailing list