From 42a3f5af31d7687a0c70b6dc347a8e676743359f Mon Sep 17 00:00:00 2001 From: Levi McAuley Date: Fri, 26 Feb 2016 17:31:34 -0800 Subject: [PATCH] Get running total workload size from work queue --- src/couch_mrview_updater.erl | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/couch_mrview_updater.erl b/src/couch_mrview_updater.erl index 0745c2a..ae77bc8 100644 --- a/src/couch_mrview_updater.erl +++ b/src/couch_mrview_updater.erl @@ -20,7 +20,7 @@ -define(REM_VAL, removed). -start_update(Partial, State, NumChanges) -> +start_update(Partial, State, _NumChanges) -> QueueOpts = [{max_size, 100000}, {max_items, 500}], {ok, DocQueue} = couch_work_queue:new(QueueOpts), {ok, WriteQueue} = couch_work_queue:new(QueueOpts), @@ -41,7 +41,7 @@ start_update(Partial, State, NumChanges) -> {design_document, State#mrst.idx_name}, {progress, 0}, {changes_done, 0}, - {total_changes, NumChanges} + {total_changes, 0} ]), couch_task_status:set_update_frequency(500), map_docs(Self, InitState) @@ -133,7 +133,7 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) -> process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 -> - couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc)), + couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc), length(Acc)), process_doc(Doc, Seq, State#mrst{doc_acc=[]}); process_doc(nil, Seq, #mrst{doc_acc=Acc}=State) -> {ok, State#mrst{doc_acc=[{nil, Seq, nil, nil} | Acc]}}; @@ -151,7 +151,7 @@ extract_rev({RevPos, [Rev | _]}) -> finish_update(#mrst{doc_acc=Acc}=State) -> if Acc /= [] -> - couch_work_queue:queue(State#mrst.doc_queue, Acc); + couch_work_queue:queue(State#mrst.doc_queue, Acc, length(Acc)); true -> ok end, couch_work_queue:close(State#mrst.doc_queue), @@ -169,6 +169,11 @@ finish_update(#mrst{doc_acc=Acc}=State) -> map_docs(Parent, State0) -> + case couch_work_queue:total_in(State0#mrst.doc_queue) of + closed -> ok; + NewTotal -> update_task_total(NewTotal) + end, + case couch_work_queue:dequeue(State0#mrst.doc_queue) of closed -> couch_query_servers:stop_doc_map(State0#mrst.qserver), @@ -192,7 +197,7 @@ map_docs(Parent, State0) -> {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]} end, FoldFun = fun(Docs, Acc) -> - update_task(length(Docs)), + update_task_processed(length(Docs)), lists:foldl(DocFun, Acc, Docs) end, Results = lists:foldl(FoldFun, {0, []}, Dequeued), @@ -458,7 +463,7 @@ send_partial(_, _) -> ok. -update_task(NumChanges) -> +update_task_processed(NumChanges) -> [Changes, Total] = couch_task_status:get([changes_done, total_changes]), Changes2 = Changes + NumChanges, Progress = case Total of @@ -471,6 +476,10 @@ update_task(NumChanges) -> couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]). +update_task_total(Total) -> + couch_task_status:update([{total_changes, Total}]). + + maybe_notify(State, View, KVs, ToRem) -> Updated = fun() -> [Key || {{Key, _}, _} <- KVs]