handle crawl timestamps in db
This commit is contained in:
parent
1e4cebb26b
commit
f0f3f76f6c
|
@ -119,14 +119,24 @@ defmodule Backend.Crawler do
|
||||||
user_count: result.user_count,
|
user_count: result.user_count,
|
||||||
status_count: result.status_count,
|
status_count: result.status_count,
|
||||||
type: instance_type,
|
type: instance_type,
|
||||||
base_domain: get_base_domain(domain)
|
base_domain: get_base_domain(domain),
|
||||||
|
next_crawl: NaiveDateTime.add(now, get_config(:crawl_interval_mins) * 60, :second)
|
||||||
}
|
}
|
||||||
|
|
||||||
Repo.insert!(
|
Repo.insert!(
|
||||||
instance,
|
instance,
|
||||||
on_conflict:
|
on_conflict:
|
||||||
{:replace,
|
{:replace,
|
||||||
[:description, :version, :user_count, :status_count, :type, :base_domain, :updated_at]},
|
[
|
||||||
|
:description,
|
||||||
|
:version,
|
||||||
|
:user_count,
|
||||||
|
:status_count,
|
||||||
|
:type,
|
||||||
|
:base_domain,
|
||||||
|
:updated_at,
|
||||||
|
:next_crawl
|
||||||
|
]},
|
||||||
conflict_target: :domain
|
conflict_target: :domain
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -153,7 +163,7 @@ defmodule Backend.Crawler do
|
||||||
|
|
||||||
peers =
|
peers =
|
||||||
peers_domains
|
peers_domains
|
||||||
|> Enum.map(&%{domain: &1, inserted_at: now, updated_at: now})
|
|> Enum.map(&%{domain: &1, inserted_at: now, updated_at: now, next_crawl: now})
|
||||||
|
|
||||||
Instance
|
Instance
|
||||||
|> Repo.insert_all(peers, on_conflict: :nothing, conflict_target: :domain)
|
|> Repo.insert_all(peers, on_conflict: :nothing, conflict_target: :domain)
|
||||||
|
@ -220,6 +230,8 @@ defmodule Backend.Crawler do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp save(%{domain: domain, error: error, allows_crawling?: allows_crawling}) do
|
defp save(%{domain: domain, error: error, allows_crawling?: allows_crawling}) do
|
||||||
|
now = get_now()
|
||||||
|
|
||||||
error =
|
error =
|
||||||
cond do
|
cond do
|
||||||
not allows_crawling -> "robots.txt"
|
not allows_crawling -> "robots.txt"
|
||||||
|
@ -227,13 +239,22 @@ defmodule Backend.Crawler do
|
||||||
true -> "unknown error"
|
true -> "unknown error"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# The "+1" is this error!
|
||||||
|
error_count = get_recent_crawl_error_count(domain) + 1
|
||||||
|
# The crawl interval grows exponentially at first but never goes above 24 hours
|
||||||
|
crawl_interval_mins =
|
||||||
|
min(get_config(:crawl_interval_mins) * round(:math.pow(2, error_count)), 1440)
|
||||||
|
|
||||||
|
next_crawl = NaiveDateTime.add(now, crawl_interval_mins * 60, :second)
|
||||||
|
|
||||||
Repo.transaction(fn ->
|
Repo.transaction(fn ->
|
||||||
Repo.insert!(
|
Repo.insert!(
|
||||||
%Instance{
|
%Instance{
|
||||||
domain: domain,
|
domain: domain,
|
||||||
base_domain: get_base_domain(domain)
|
base_domain: get_base_domain(domain),
|
||||||
|
next_crawl: next_crawl
|
||||||
},
|
},
|
||||||
on_conflict: {:replace, [:base_domain]},
|
on_conflict: {:replace, [:next_crawl]},
|
||||||
conflict_target: :domain
|
conflict_target: :domain
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
defmodule Backend.Crawler.StaleInstanceManager do
|
defmodule Backend.Crawler.StaleInstanceManager do
|
||||||
use GenServer
|
use GenServer
|
||||||
alias Backend.{Crawl, Instance, Repo}
|
alias Backend.{Instance, Repo}
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
import Backend.Util
|
import Backend.Util
|
||||||
require Logger
|
require Logger
|
||||||
|
@ -49,10 +49,14 @@ defmodule Backend.Crawler.StaleInstanceManager do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp queue_stale_domains() do
|
defp queue_stale_domains() do
|
||||||
|
now = get_now()
|
||||||
|
|
||||||
stale_domains =
|
stale_domains =
|
||||||
get_live_domains_to_crawl()
|
Instance
|
||||||
|> MapSet.union(get_dead_domains_to_crawl())
|
|> select([i], i.domain)
|
||||||
|> MapSet.union(get_new_domains_to_crawl())
|
|> where([i], i.next_crawl < ^now)
|
||||||
|
|> Repo.all()
|
||||||
|
|> MapSet.new()
|
||||||
|
|
||||||
# Don't add a domain that's already in the queue
|
# Don't add a domain that's already in the queue
|
||||||
domains_in_queue = get_domains_in_queue(stale_domains)
|
domains_in_queue = get_domains_in_queue(stale_domains)
|
||||||
|
@ -68,118 +72,6 @@ defmodule Backend.Crawler.StaleInstanceManager do
|
||||||
{:run, [domain]} |> Honeydew.async(:crawl_queue)
|
{:run, [domain]} |> Honeydew.async(:crawl_queue)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Handles instances where the most recent crawl was successful
|
|
||||||
@spec get_live_domains_to_crawl() :: MapSet.t()
|
|
||||||
defp get_live_domains_to_crawl() do
|
|
||||||
interval_mins = -1 * get_config(:crawl_interval_mins)
|
|
||||||
|
|
||||||
most_recent_crawl_subquery =
|
|
||||||
Crawl
|
|
||||||
|> select([c], %{
|
|
||||||
instance_domain: c.instance_domain,
|
|
||||||
inserted_at: max(c.inserted_at)
|
|
||||||
})
|
|
||||||
|> group_by([c], c.instance_domain)
|
|
||||||
|
|
||||||
Instance
|
|
||||||
|> join(:left, [i], most_recent_crawl in subquery(most_recent_crawl_subquery),
|
|
||||||
on: i.domain == most_recent_crawl.instance_domain
|
|
||||||
)
|
|
||||||
# Joining on a timestamp is really gross, but since we're joining on a timestamp in the same table, we should be OK.
|
|
||||||
|> join(:left, [i, most_recent_crawl], crawls in Crawl,
|
|
||||||
on:
|
|
||||||
i.domain == crawls.instance_domain and most_recent_crawl.inserted_at == crawls.inserted_at
|
|
||||||
)
|
|
||||||
|> where(
|
|
||||||
[i, most_recent_crawl, crawls],
|
|
||||||
is_nil(crawls.error) and
|
|
||||||
most_recent_crawl.inserted_at <
|
|
||||||
datetime_add(^NaiveDateTime.utc_now(), ^interval_mins, "minute") and not i.opt_out
|
|
||||||
)
|
|
||||||
|> select([i], i.domain)
|
|
||||||
|> Repo.all()
|
|
||||||
|> MapSet.new()
|
|
||||||
end
|
|
||||||
|
|
||||||
# Handles instances that have never been crawled at all.
|
|
||||||
@spec get_new_domains_to_crawl() :: MapSet.t()
|
|
||||||
defp get_new_domains_to_crawl() do
|
|
||||||
all_crawls_subquery =
|
|
||||||
Crawl
|
|
||||||
|> select([c], %{
|
|
||||||
instance_domain: c.instance_domain,
|
|
||||||
crawl_count: count(c.id)
|
|
||||||
})
|
|
||||||
|> group_by([c], c.instance_domain)
|
|
||||||
|
|
||||||
Instance
|
|
||||||
|> join(:left, [i], c in subquery(all_crawls_subquery), on: i.domain == c.instance_domain)
|
|
||||||
|> where([i, c], (is_nil(c.crawl_count) or c.crawl_count == 0) and not i.opt_out)
|
|
||||||
|> select([i], i.domain)
|
|
||||||
|> Repo.all()
|
|
||||||
|> MapSet.new()
|
|
||||||
end
|
|
||||||
|
|
||||||
# Handles instances where the previous crawl(s) were unsuccessful.
|
|
||||||
# These are crawled with an increasing delay
|
|
||||||
@spec get_dead_domains_to_crawl() :: MapSet.t()
|
|
||||||
defp get_dead_domains_to_crawl() do
|
|
||||||
now = get_now()
|
|
||||||
interval_mins = get_config(:crawl_interval_mins)
|
|
||||||
|
|
||||||
most_recent_successful_crawl_subquery =
|
|
||||||
Crawl
|
|
||||||
|> select([c], %{
|
|
||||||
instance_domain: c.instance_domain,
|
|
||||||
timestamp: max(c.inserted_at)
|
|
||||||
})
|
|
||||||
|> where([c], is_nil(c.error))
|
|
||||||
|> group_by([c], c.instance_domain)
|
|
||||||
|
|
||||||
Instance
|
|
||||||
|> join(
|
|
||||||
:left,
|
|
||||||
[i],
|
|
||||||
most_recent_successful_crawl in subquery(most_recent_successful_crawl_subquery),
|
|
||||||
on: i.domain == most_recent_successful_crawl.instance_domain
|
|
||||||
)
|
|
||||||
|> join(:left, [i, most_recent_successful_crawl_subquery], crawls in Crawl,
|
|
||||||
on: i.domain == crawls.instance_domain
|
|
||||||
)
|
|
||||||
|> select([i, most_recent_successful_crawl, crawls], %{
|
|
||||||
domain: i.domain,
|
|
||||||
most_recent_crawl: max(crawls.inserted_at),
|
|
||||||
failed_crawls: count(crawls.id)
|
|
||||||
})
|
|
||||||
|> group_by([i, most_recent_successful_crawl, crawls], i.domain)
|
|
||||||
|> where(
|
|
||||||
[i, most_recent_successful_crawl, crawls],
|
|
||||||
crawls.inserted_at > most_recent_successful_crawl.timestamp and not i.opt_out
|
|
||||||
)
|
|
||||||
|> Repo.all()
|
|
||||||
# We now have a list of domains, the # of failed crawls, and the most recent crawl timestamp.
|
|
||||||
# Now we filter down to those that should be crawled now.
|
|
||||||
|> Enum.map(fn %{
|
|
||||||
domain: domain,
|
|
||||||
most_recent_crawl: most_recent_crawl,
|
|
||||||
failed_crawls: failed_crawls
|
|
||||||
} ->
|
|
||||||
# The interval is never more than 24 hours
|
|
||||||
curr_interval = min(1440, interval_mins * round(:math.pow(2, failed_crawls)))
|
|
||||||
next_crawl = NaiveDateTime.add(most_recent_crawl, curr_interval * 60, :second)
|
|
||||||
|
|
||||||
%{
|
|
||||||
domain: domain,
|
|
||||||
next_crawl: next_crawl
|
|
||||||
}
|
|
||||||
end)
|
|
||||||
|> Enum.filter(fn %{next_crawl: next_crawl} ->
|
|
||||||
NaiveDateTime.compare(now, next_crawl) == :gt
|
|
||||||
end)
|
|
||||||
|> Enum.map(fn %{domain: domain} -> domain end)
|
|
||||||
|> MapSet.new()
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec get_domains_in_queue(MapSet.t()) :: MapSet.t()
|
@spec get_domains_in_queue(MapSet.t()) :: MapSet.t()
|
||||||
defp get_domains_in_queue(domains) do
|
defp get_domains_in_queue(domains) do
|
||||||
Honeydew.filter(:crawl_queue, fn job ->
|
Honeydew.filter(:crawl_queue, fn job ->
|
||||||
|
|
|
@ -14,6 +14,7 @@ defmodule Backend.Instance do
|
||||||
field :base_domain, :string
|
field :base_domain, :string
|
||||||
field :opt_in, :boolean
|
field :opt_in, :boolean
|
||||||
field :opt_out, :boolean
|
field :opt_out, :boolean
|
||||||
|
field :next_crawl, :naive_datetime
|
||||||
|
|
||||||
many_to_many :peers, Backend.Instance,
|
many_to_many :peers, Backend.Instance,
|
||||||
join_through: Backend.InstancePeer,
|
join_through: Backend.InstancePeer,
|
||||||
|
@ -43,7 +44,8 @@ defmodule Backend.Instance do
|
||||||
:statuses_per_day,
|
:statuses_per_day,
|
||||||
:base_domain,
|
:base_domain,
|
||||||
:opt_in,
|
:opt_in,
|
||||||
:opt_out
|
:opt_out,
|
||||||
|
:next_crawl
|
||||||
])
|
])
|
||||||
|> validate_required([:domain])
|
|> validate_required([:domain])
|
||||||
|> put_assoc(:peers, attrs.peers)
|
|> put_assoc(:peers, attrs.peers)
|
||||||
|
|
|
@ -167,4 +167,28 @@ defmodule Backend.Util do
|
||||||
def convert_keys_to_atoms(map) do
|
def convert_keys_to_atoms(map) do
|
||||||
map |> Map.new(fn {k, v} -> {String.to_atom(k), v} end)
|
map |> Map.new(fn {k, v} -> {String.to_atom(k), v} end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Given a domain, returns the number of n most recent crawls that errored
|
||||||
|
@spec get_recent_crawl_error_count(String.t()) :: integer
|
||||||
|
def get_recent_crawl_error_count(domain) do
|
||||||
|
most_recent_success_crawl_subquery =
|
||||||
|
Crawl
|
||||||
|
|> select([c], %{
|
||||||
|
instance_domain: c.instance_domain,
|
||||||
|
timestamp: max(c.inserted_at)
|
||||||
|
})
|
||||||
|
|> where([c], c.instance_domain == ^domain and is_nil(c.error))
|
||||||
|
|> group_by([c], c.instance_domain)
|
||||||
|
|
||||||
|
Crawl
|
||||||
|
|> join(:left, [c1], c2 in subquery(most_recent_success_crawl_subquery),
|
||||||
|
on: c1.instance_domain == c2.instance_domain
|
||||||
|
)
|
||||||
|
|> where(
|
||||||
|
[c1, c2],
|
||||||
|
c1.instance_domain == ^domain and (c1.inserted_at > c2.timestamp or is_nil(c2.timestamp))
|
||||||
|
)
|
||||||
|
|> select([c1, c2], count(c1.id))
|
||||||
|
|> Repo.one()
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
defmodule Backend.Repo.Migrations.AddNextCrawlToInstances do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def change do
|
||||||
|
alter table(:instances) do
|
||||||
|
add :next_crawl, :naive_datetime
|
||||||
|
end
|
||||||
|
|
||||||
|
create index(:instances, [:next_crawl])
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in a new issue