读书人

两个周前挖了一个坑现在跳了进去lt;ri

发布时间: 2013-08-10 21:14:06 作者: rapoo

两个周前挖了一个坑,现在跳了进去<riak mapreduce 分析>,顺便能实现分页功能了
map(Record, undefined, {sub_rank}) -> ?DEBUG("~p:map sub_rank ~p ~n", [?MODULE, ?LINE]), case riak_kv_util:is_x_deleted(Record) of false -> {struct, List} = mochijson:decode(riak_object:get_value(Record)), FollowedId = get_value(List, "followed_id"), [{FollowedId, 1}]; _ ->[] end;

?接着写了这样一个reduce

?

reduce[1]: 把相同的followedId的value相加

?

reduce(Records,  {sub_rank}) ->   FSum = fun({FollowedId, Count}, Acc) ->                Value = proplists:get_value(FollowedId, Acc, 0),                [{FollowedId, Value+Count}|proplists:delete(FollowedId, Acc)]           end,    lists:foldr(FSum, [], Records); 

?在3000个obj的情况下跑了一边,没问题大公告成。

?

因为还涉及到取出根据count排序前50条。

所以需要再添加reduce[2]

reduce[2]:取出前50条记录

?

reduce(Records,  {sub_rank, Max}) when is_integer(Max) ->    ?DEBUG("~p:reduce sub_rank ~p Max=~p, Records=~p~n", [?MODULE, ?LINE, Max, Records]),    lists:sublist(lists:reverse(lists:keysort(2,Records)), Max);

?

?

本以为这样就完事了,当20万个obj情况下,这个mapreduce照样查询不出数据来,一直提示timeout,设置10分钟,15分钟都是timeout。

? ??Query代码:

Query=[{map,{modfun,trend_riak,map},{sub_rank},false},          {reduce,{modfun,trend_riak,reduce},{sub_rank},false},          {reduce,{modfun,trend_riak,reduce},{sub_rank, 50},true}],  

?

?1、分析慢的原因

? ? 只用riak_pb_socket:mapred/3执行map,不执行reduce

Query=[{map,{modfun,trend_riak,map},{sub_rank},false},
? ?数据能查询出来大约花了4s。

?

? ?我这时只是感觉奇怪,心想为什么返回数据多了还快了。reduce使其返回数据少了,为什么却慢了。

? ?这下只好仔细看看reduce代码了。

? ? 定位到了proplist操作上。

? ??

Value = proplists:get_value(FollowedId, Acc, 0),[{FollowedId, Value+Count}|proplists:delete(FollowedId, Acc)]
?? ?测试了一下,发现两个操作确实比较耗时。

?

?

? ? 这时我想到了用dict才实现

? ? 用dict之前是这样实现的:

? ??

?

这操作是在shell计算,20万条数据花费 141s    FSum = fun({FollowedId, Count}, Acc) ->                Value = proplists:get_value(FollowedId, Acc, 0),                [{FollowedId, Value+Count}|proplists:delete(FollowedId, Acc)]           end,    lists:foldr(FSum, [], Records); 
?

?

? ? ?用dict首先想到这样实现:

? ??

这操作是在shell计算,20万条数据花费 12s    FSum = fun({FollowedId, Count}, Acc) ->                case  dict:is_key(FollowedId, Acc) of                    true ->                        Value = dict:fetch(FollowedId, Acc),                        dict:store(FollowedId, Count+Value, Acc);                    false ->                        dict:store(FollowedId, Count, Acc)                end            end,    lists:foldr(FSum, dict:new(), Records);    
? ? ?缺点:用dict时如果直接用dict:fetch/2函数时,如果K不存在会抛出一个异常错误,这也是我平常不用dict的原因懒的每次都调用dict:is_key/2判断。这里判断一次,取出一次,存储一次总共判断了三或两次。 这操作是在shell计算,20万条数据花费 10s FSum2 = fun({FollowedId, Count}, Acc) -> case dict:is_key(FollowedId, Acc) of true -> dict:update_counter(FollowedId, Count, Acc); false -> dict:store(FollowedId, Count, Acc) end end, lists:foldr(FSum2, dict:new(), Records);?这操作是在shell计算,20万条数据花费 3s FSum3 = fun({FollowedId, Count}, Acc) -> dict:update_counter(FollowedId, Count, Acc) end, lists:foldr(FSum3, dict:new(), Records);

Add?Increment?to the value associated with?Key?and store this value. If?Key?is not present in the dictionary then?Incrementwill be stored as the first value.

?

? ?这时看上去已经省去了不少时间了由141s降到了3s。? 返回一个元素的list,[dict()] reduce(Records, {sub_rank}) -> FSum3 = fun({FollowedId, Count}, Acc) -> dict:update_counter(FollowedId, Count, Acc); (Dict, Acc) -> dict:merge(fun(_K, V, V1) ->V+V1 end, Dict, Acc) end, Return = lists:foldr(FSum3, dict:new(), Records), [Return];顺便也得重写第二个reduce% 取出前Max个reduce([Records], {sub_rank, Max}) when is_integer(Max) -> lists:sublist(lists:reverse(lists:keysort(2,dict:to_list(Records))), Max);

? ?这下终于算是OK ,

??

? Query代码:执行花费12s

Query=[{map,{modfun,trend_riak,map},{sub_rank},false},          {reduce,{modfun,trend_riak,reduce},{sub_rank},false},          {reduce,{modfun,trend_riak,reduce},{sub_rank, 50},true}],  

? 根据这个最后的思路加上第二个reduce也能实现分页功能了

?

?

?

?

?

读书人网 >编程

热点推荐