OrderedMemoryAwareThreadPoolExecutor with UDP

Cygan cygan80 at hotmail.com
Wed Jul 15 07:58:47 EDT 2009


Hi, Trustin, thank you for your reply.

I use UDP instead of TCP because of NAT traversal, i need
OrderedMemoryAwareThreadPoolExecutor because my application need reliable
data delivery. And yes, i did use sequence number for the reliability.

For removal of childExecutors map, is it thread-safe to remove from
anywhere? or i must do it inside the executor like the default one?


Trustin Lee wrote:
> 
> Hi Cygan,
> 
> Actually, there was a similar request from another user.  Yes, you can
> modify OrderedMemoryAwareThreadPoolExecutor so that you can maintain the
> order of the events per 'some key'.  However, you will have to remove
> the key from the childExecutors map manually because the modified
> executor will not be able to determine when the map entry should be
> released.
> 
> On the other hand, UDP does not guarantee the packet delivery order.
> Therefore, no matter what you do in your application, the package can
> arrive in an incorrect order.  You have to put the sequence number of
> each packet to trace or sort the delivery order.  I would use TCP simply.
> 
> HTH,
> Trustin
> 
> On 07/14/2009 11:42 AM, Cygan wrote:
>> hi, i'm trying to get OrderedMemoryAwareThreadPoolExecutor to work with
>> UDP,
>> since UDP don't have channel on it, so i use session id to maintains the
>> events order instead of channel. Here's the code for my custom
>> ThreadPoolExecutor. Are my code going to work fine just like the default
>> one
>> with TCP?
>> 
>> 
>> public class MyCustomThreadPoolExecutor extends
>>         MemoryAwareThreadPoolExecutor {
>> 
>>     private final ConcurrentMap<Integer, Executor> childExecutors =
>>         new ConcurrentIdentityWeakKeyHashMap<Integer, Executor>();
>> 
>> ...
>> ...
>> ...
>> 
>>    private Executor getOrderedExecutor(ChannelEvent e) {
>>     	int sessionID = -1; 	
>>     	if(e instanceof UpstreamMessageEvent){
>>     		UpstreamMessageEvent k = (UpstreamMessageEvent)e;
>>         	ChannelBuffer msg = (ChannelBuffer)k.getMessage();
>>                 sessionID = msg.readInt();
>>         	msg.readerIndex(0);
>>     	}
>>     	
>>         //Channel channel = e.getChannel();
>>         Executor executor = childExecutors.get(sessionID);
>>         if (executor == null) {
>> 
>>             executor = new ChildExecutor();
>>             Executor oldExecutor = childExecutors.putIfAbsent(sessionID,
>> executor);
>>             if (oldExecutor != null) {
>>                 executor = oldExecutor;
>>             }
>>         }
>> 
>>         // Remove the entry when the channel closes.
>>         /*
>>         if (e instanceof ChannelStateEvent) {
>>             ChannelStateEvent se = (ChannelStateEvent) e;
>>             if (se.getState() == ChannelState.OPEN &&
>>                 !channel.isOpen()) {
>>                 childExecutors.remove(channel);
>>             }
>>         }
>>         */
>>         return executor;
>>     }
>> ...
>> ...
>> ...
>> }
> 
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
> 
> 

-- 
View this message in context: http://n2.nabble.com/OrderedMemoryAwareThreadPoolExecutor-with-UDP-tp3254023p3262702.html
Sent from the Netty User Group mailing list archive at Nabble.com.



More information about the netty-users mailing list