set infinite timeout on v expensive queries
This commit is contained in:
parent
012fc1d85a
commit
d9b9081ec3
|
@ -25,7 +25,7 @@ defmodule Backend.Scheduler do
|
||||||
i.inserted_at <
|
i.inserted_at <
|
||||||
datetime_add(^NaiveDateTime.utc_now(), -1 * ^amount, ^unit)
|
datetime_add(^NaiveDateTime.utc_now(), -1 * ^amount, ^unit)
|
||||||
)
|
)
|
||||||
|> Repo.delete_all()
|
|> Repo.delete_all(timeout: :infinity)
|
||||||
|
|
||||||
Logger.info("Pruned #{deleted_num} old crawls.")
|
Logger.info("Pruned #{deleted_num} old crawls.")
|
||||||
end
|
end
|
||||||
|
@ -59,7 +59,7 @@ defmodule Backend.Scheduler do
|
||||||
# we can take min() because every row is the same
|
# we can take min() because every row is the same
|
||||||
interactions: min(c.interactions_seen)
|
interactions: min(c.interactions_seen)
|
||||||
})
|
})
|
||||||
|> Repo.all()
|
|> Repo.all(timeout: :infinity)
|
||||||
|> Enum.map(fn %{domain: domain, mentions: mentions, interactions: interactions} ->
|
|> Enum.map(fn %{domain: domain, mentions: mentions, interactions: interactions} ->
|
||||||
%{
|
%{
|
||||||
domain: domain,
|
domain: domain,
|
||||||
|
@ -72,7 +72,8 @@ defmodule Backend.Scheduler do
|
||||||
Instance
|
Instance
|
||||||
|> Repo.insert_all(scores,
|
|> Repo.insert_all(scores,
|
||||||
on_conflict: {:replace, [:insularity, :updated_at]},
|
on_conflict: {:replace, [:insularity, :updated_at]},
|
||||||
conflict_target: :domain
|
conflict_target: :domain,
|
||||||
|
timeout: :infinity
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -107,7 +108,7 @@ defmodule Backend.Scheduler do
|
||||||
second_earliest_crawl: min(c.inserted_at)
|
second_earliest_crawl: min(c.inserted_at)
|
||||||
})
|
})
|
||||||
|> group_by([c], c.instance_domain)
|
|> group_by([c], c.instance_domain)
|
||||||
|> Repo.all()
|
|> Repo.all(timeout: :infinity)
|
||||||
|> Enum.map(fn %{
|
|> Enum.map(fn %{
|
||||||
instance_domain: domain,
|
instance_domain: domain,
|
||||||
status_count: status_count,
|
status_count: status_count,
|
||||||
|
@ -128,7 +129,8 @@ defmodule Backend.Scheduler do
|
||||||
Instance
|
Instance
|
||||||
|> Repo.insert_all(instances,
|
|> Repo.insert_all(instances,
|
||||||
on_conflict: {:replace, [:statuses_per_day, :updated_at]},
|
on_conflict: {:replace, [:statuses_per_day, :updated_at]},
|
||||||
conflict_target: :domain
|
conflict_target: :domain,
|
||||||
|
timeout: :infinity
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -167,53 +169,57 @@ defmodule Backend.Scheduler do
|
||||||
source_statuses_seen: min(c_source.statuses_seen),
|
source_statuses_seen: min(c_source.statuses_seen),
|
||||||
target_statuses_seen: min(c_target.statuses_seen)
|
target_statuses_seen: min(c_target.statuses_seen)
|
||||||
})
|
})
|
||||||
|> Repo.all()
|
|> Repo.all(timeout: :infinity)
|
||||||
|
|
||||||
# Get edges and their weights
|
# Get edges and their weights
|
||||||
Repo.transaction(fn ->
|
Repo.transaction(
|
||||||
Edge
|
fn ->
|
||||||
|> Repo.delete_all()
|
Edge
|
||||||
|
|> Repo.delete_all(timeout: :infinity)
|
||||||
|
|
||||||
edges =
|
edges =
|
||||||
interactions
|
interactions
|
||||||
# Get a map of %{{source, target} => {total_mention_count, total_statuses_seen}}
|
# Get a map of %{{source, target} => {total_mention_count, total_statuses_seen}}
|
||||||
|> Enum.reduce(%{}, fn
|
|> Enum.reduce(%{}, fn
|
||||||
%{
|
%{
|
||||||
source_domain: source_domain,
|
source_domain: source_domain,
|
||||||
target_domain: target_domain,
|
target_domain: target_domain,
|
||||||
mentions: mentions,
|
mentions: mentions,
|
||||||
source_statuses_seen: source_statuses_seen,
|
source_statuses_seen: source_statuses_seen,
|
||||||
target_statuses_seen: target_statuses_seen
|
target_statuses_seen: target_statuses_seen
|
||||||
},
|
},
|
||||||
acc ->
|
acc ->
|
||||||
key = get_interaction_key(source_domain, target_domain)
|
key = get_interaction_key(source_domain, target_domain)
|
||||||
|
|
||||||
# target_statuses_seen might be nil if that instance was never crawled. default to 0.
|
# target_statuses_seen might be nil if that instance was never crawled. default to 0.
|
||||||
target_statuses_seen =
|
target_statuses_seen =
|
||||||
case target_statuses_seen do
|
case target_statuses_seen do
|
||||||
nil -> 0
|
nil -> 0
|
||||||
_ -> target_statuses_seen
|
_ -> target_statuses_seen
|
||||||
end
|
end
|
||||||
|
|
||||||
statuses_seen = source_statuses_seen + target_statuses_seen
|
statuses_seen = source_statuses_seen + target_statuses_seen
|
||||||
|
|
||||||
Map.update(acc, key, {mentions, statuses_seen}, fn {curr_mentions, curr_statuses_seen} ->
|
Map.update(acc, key, {mentions, statuses_seen}, fn {curr_mentions,
|
||||||
{curr_mentions + mentions, curr_statuses_seen}
|
curr_statuses_seen} ->
|
||||||
end)
|
{curr_mentions + mentions, curr_statuses_seen}
|
||||||
end)
|
end)
|
||||||
|> Enum.map(fn {{source_domain, target_domain}, {mention_count, statuses_seen}} ->
|
end)
|
||||||
%{
|
|> Enum.map(fn {{source_domain, target_domain}, {mention_count, statuses_seen}} ->
|
||||||
source_domain: source_domain,
|
%{
|
||||||
target_domain: target_domain,
|
source_domain: source_domain,
|
||||||
weight: mention_count / statuses_seen,
|
target_domain: target_domain,
|
||||||
inserted_at: now,
|
weight: mention_count / statuses_seen,
|
||||||
updated_at: now
|
inserted_at: now,
|
||||||
}
|
updated_at: now
|
||||||
end)
|
}
|
||||||
|
end)
|
||||||
|
|
||||||
Edge
|
Edge
|
||||||
|> Repo.insert_all(edges)
|
|> Repo.insert_all(edges, timeout: :infinity)
|
||||||
end)
|
end,
|
||||||
|
timeout: :infinity
|
||||||
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
|
|
Loading…
Reference in a new issue