OrderedMemoryAwareThreadPoolExecutor with UDP

"이희승 (Trustin Lee)" trustin at gmail.com
Wed Jul 15 08:34:52 EDT 2009


Hi Cygan,

On 07/15/2009 08:58 PM, Cygan wrote:
> Hi, Trustin, thank you for your reply.

You're welcome.  Sorry for being later than expected.

> I use UDP instead of TCP because of NAT traversal, i need
> OrderedMemoryAwareThreadPoolExecutor because my application need reliable
> data delivery. And yes, i did use sequence number for the reliability.

I see.  Sounds like a cool project. :)

> For removal of childExecutors map, is it thread-safe to remove from
> anywhere? or i must do it inside the executor like the default one?

Removal of an entry from childExecutors map is thread safe as it's
ConcurrentMap.  That is, you can remove an entry from anywhere.

HTH,
Trustin

> Trustin Lee wrote:
>> Hi Cygan,
>>
>> Actually, there was a similar request from another user.  Yes, you can
>> modify OrderedMemoryAwareThreadPoolExecutor so that you can maintain the
>> order of the events per 'some key'.  However, you will have to remove
>> the key from the childExecutors map manually because the modified
>> executor will not be able to determine when the map entry should be
>> released.
>>
>> On the other hand, UDP does not guarantee the packet delivery order.
>> Therefore, no matter what you do in your application, the package can
>> arrive in an incorrect order.  You have to put the sequence number of
>> each packet to trace or sort the delivery order.  I would use TCP simply.
>>
>> HTH,
>> Trustin
>>
>> On 07/14/2009 11:42 AM, Cygan wrote:
>>> hi, i'm trying to get OrderedMemoryAwareThreadPoolExecutor to work with
>>> UDP,
>>> since UDP don't have channel on it, so i use session id to maintains the
>>> events order instead of channel. Here's the code for my custom
>>> ThreadPoolExecutor. Are my code going to work fine just like the default
>>> one
>>> with TCP?
>>>
>>>
>>> public class MyCustomThreadPoolExecutor extends
>>>         MemoryAwareThreadPoolExecutor {
>>>
>>>     private final ConcurrentMap<Integer, Executor> childExecutors =
>>>         new ConcurrentIdentityWeakKeyHashMap<Integer, Executor>();
>>>
>>> ...
>>> ...
>>> ...
>>>
>>>    private Executor getOrderedExecutor(ChannelEvent e) {
>>>     	int sessionID = -1; 	
>>>     	if(e instanceof UpstreamMessageEvent){
>>>     		UpstreamMessageEvent k = (UpstreamMessageEvent)e;
>>>         	ChannelBuffer msg = (ChannelBuffer)k.getMessage();
>>>                 sessionID = msg.readInt();
>>>         	msg.readerIndex(0);
>>>     	}
>>>     	
>>>         //Channel channel = e.getChannel();
>>>         Executor executor = childExecutors.get(sessionID);
>>>         if (executor == null) {
>>>
>>>             executor = new ChildExecutor();
>>>             Executor oldExecutor = childExecutors.putIfAbsent(sessionID,
>>> executor);
>>>             if (oldExecutor != null) {
>>>                 executor = oldExecutor;
>>>             }
>>>         }
>>>
>>>         // Remove the entry when the channel closes.
>>>         /*
>>>         if (e instanceof ChannelStateEvent) {
>>>             ChannelStateEvent se = (ChannelStateEvent) e;
>>>             if (se.getState() == ChannelState.OPEN &&
>>>                 !channel.isOpen()) {
>>>                 childExecutors.remove(channel);
>>>             }
>>>         }
>>>         */
>>>         return executor;
>>>     }
>>> ...
>>> ...
>>> ...
>>> }
>> _______________________________________________
>> netty-users mailing list
>> netty-users at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/netty-users
>>
>>
> 




More information about the netty-users mailing list