add servers to watch instances
This commit is contained in:
parent
6da83e4bd1
commit
7ffc009d1d
|
@ -17,6 +17,8 @@ config :backend, BackendWeb.Endpoint,
|
|||
render_errors: [view: BackendWeb.ErrorView, accepts: ~w(json)],
|
||||
pubsub: [name: Backend.PubSub, adapter: Phoenix.PubSub.PG2]
|
||||
|
||||
config :backend, Backend.Repo, queue_target: 5000
|
||||
|
||||
# Configures Elixir's Logger
|
||||
config :logger, :console,
|
||||
format: "$time $metadata[$level] $message\n",
|
||||
|
@ -28,7 +30,8 @@ config :phoenix, :json_library, Jason
|
|||
config :backend, :crawler,
|
||||
status_age_limit_days: 28,
|
||||
status_count_limit: 100,
|
||||
personal_instance_threshold: 1
|
||||
personal_instance_threshold: 1,
|
||||
crawl_interval_mins: 30
|
||||
|
||||
# Import environment specific config. This must remain at the bottom
|
||||
# of this file so it overrides the configuration defined above.
|
||||
|
|
|
@ -4,6 +4,10 @@ defmodule Backend.Application do
|
|||
@moduledoc false
|
||||
|
||||
use Application
|
||||
alias Backend.Crawler.CrawlerSupervisor
|
||||
alias Backend.{Instance, Repo}
|
||||
import Ecto.Query
|
||||
require Logger
|
||||
|
||||
def start(_type, _args) do
|
||||
# List all child processes to be supervised
|
||||
|
@ -11,7 +15,9 @@ defmodule Backend.Application do
|
|||
# Start the Ecto repository
|
||||
Backend.Repo,
|
||||
# Start the endpoint when the application starts
|
||||
BackendWeb.Endpoint
|
||||
BackendWeb.Endpoint,
|
||||
CrawlerSupervisor,
|
||||
{Task, fn -> start_instance_servers() end}
|
||||
# Starts a worker by calling: Backend.Worker.start_link(arg)
|
||||
# {Backend.Worker, arg},
|
||||
]
|
||||
|
@ -28,4 +34,22 @@ defmodule Backend.Application do
|
|||
BackendWeb.Endpoint.config_change(changed, removed)
|
||||
:ok
|
||||
end
|
||||
|
||||
defp start_instance_servers() do
|
||||
domains =
|
||||
Instance
|
||||
|> select([:domain])
|
||||
|> Repo.all()
|
||||
|> Enum.map(fn i -> i.domain end)
|
||||
|
||||
# If we don't have any instances, start with m.s as the seed.
|
||||
domains =
|
||||
case length(domains) do
|
||||
0 -> ["mastodon.social"]
|
||||
_ -> domains
|
||||
end
|
||||
|
||||
domains
|
||||
|> Enum.each(fn domain -> CrawlerSupervisor.start_child_server(domain) end)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -5,7 +5,7 @@ defmodule Backend.Crawler.Crawler do
|
|||
|
||||
alias __MODULE__
|
||||
alias Backend.Crawler.Crawlers.Mastodon
|
||||
alias Backend.Crawler.ApiCrawler
|
||||
alias Backend.Crawler.{ApiCrawler, CrawlerSupervisor}
|
||||
alias Backend.{Repo, Instance, Interaction, InstancePeer}
|
||||
import Ecto.Query
|
||||
require Logger
|
||||
|
@ -49,7 +49,7 @@ defmodule Backend.Crawler.Crawler do
|
|||
# Recursive function to check whether `domain` has an API that the head of the api_crawlers list can read.
|
||||
# If so, crawls it. If not, continues with the tail of the api_crawlers list.
|
||||
defp crawl(%Crawler{api_crawlers: []} = state, domain) do
|
||||
Logger.info("Found no compatible API for #{domain}")
|
||||
Logger.debug("Found no compatible API for #{domain}")
|
||||
Map.put(state, :found_api?, false)
|
||||
end
|
||||
|
||||
|
@ -120,7 +120,7 @@ defmodule Backend.Crawler.Crawler do
|
|||
conflict_target: :domain
|
||||
)
|
||||
|
||||
# If we discovered new instances from the peers endpoint or from mentions, add them
|
||||
# If we discovered new instances from the peers endpoint or from mentions, save them and start a server to watch
|
||||
peers =
|
||||
peers_domains
|
||||
|> MapSet.new()
|
||||
|
@ -131,6 +131,9 @@ defmodule Backend.Crawler.Crawler do
|
|||
Instance
|
||||
|> Repo.insert_all(peers, on_conflict: :nothing, conflict_target: :domain)
|
||||
|
||||
peers_domains
|
||||
|> Enum.each(fn domain -> CrawlerSupervisor.start_child_server(domain) end)
|
||||
|
||||
## Save peer relationships ##
|
||||
Repo.transaction(fn ->
|
||||
# get current peers (a list of strings)
|
||||
|
@ -146,10 +149,12 @@ defmodule Backend.Crawler.Crawler do
|
|||
# delete the peers we don't want
|
||||
dont_want = current_peers_set |> MapSet.difference(wanted_peers_set) |> MapSet.to_list()
|
||||
|
||||
InstancePeer
|
||||
|> where(source_domain: ^domain)
|
||||
|> where([p], p.target_domain in ^dont_want)
|
||||
|> Repo.delete_all([])
|
||||
if length(dont_want) > 0 do
|
||||
InstancePeer
|
||||
|> where(source_domain: ^domain)
|
||||
|> where([p], p.target_domain in ^dont_want)
|
||||
|> Repo.delete_all([])
|
||||
end
|
||||
|
||||
# insert the ones we don't have yet
|
||||
new_instance_peers =
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
defmodule Backend.Crawler.CrawlerSupervisor do
|
||||
use DynamicSupervisor
|
||||
require Logger
|
||||
alias Backend.Crawler.Server
|
||||
|
||||
def start_link(init_arg) do
|
||||
Logger.debug("Starting CrawlerSupervisor")
|
||||
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
DynamicSupervisor.init(strategy: :one_for_one)
|
||||
end
|
||||
|
||||
def start_child_server(domain) do
|
||||
spec = {Server, domain: domain}
|
||||
DynamicSupervisor.start_child(__MODULE__, spec)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,46 @@
|
|||
defmodule Backend.Crawler.Server do
|
||||
use GenServer
|
||||
require Logger
|
||||
import Backend.Crawler.Util
|
||||
alias Backend.Crawler.Crawler
|
||||
|
||||
# Client
|
||||
def start_link(domain: domain) do
|
||||
# TODO: use registry instead!
|
||||
GenServer.start_link(__MODULE__, domain, name: String.to_atom(domain))
|
||||
end
|
||||
|
||||
# Server
|
||||
@impl true
|
||||
def init(domain) do
|
||||
# TODO: check if instance has opted out
|
||||
# instance = Repo.get_by!(Instance, domain: domain)
|
||||
Process.send(self(), :crawl, [])
|
||||
{:ok, domain}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:crawl, state) do
|
||||
now = NaiveDateTime.utc_now()
|
||||
last_crawl_timestamp = get_last_crawl(state)
|
||||
|
||||
if NaiveDateTime.diff(now, last_crawl_timestamp, :second) * 60 >
|
||||
get_config(:crawl_interval_mins) do
|
||||
Logger.debug(NaiveDateTime.diff(now, last_crawl_timestamp, :second))
|
||||
Crawler.run(state)
|
||||
schedule_crawl()
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
# Workaround to handle Hackney bug: https://github.com/benoitc/hackney/issues/464
|
||||
def handle_info({:ssl_closed, _msg}, state) do
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp schedule_crawl() do
|
||||
interval = get_config(:crawl_interval_mins) * 60_000
|
||||
Process.send_after(self(), :crawl, interval)
|
||||
end
|
||||
end
|
|
@ -1,4 +1,7 @@
|
|||
defmodule Backend.Crawler.Util do
|
||||
import Ecto.Query
|
||||
alias Backend.{Instance, Repo}
|
||||
|
||||
@spec get_config(atom) :: any
|
||||
def get_config(key) do
|
||||
Application.get_env(:backend, :crawler)[key]
|
||||
|
@ -35,4 +38,12 @@ defmodule Backend.Crawler.Util do
|
|||
|> Kernel.===(:gt)
|
||||
end
|
||||
end
|
||||
|
||||
@spec get_last_crawl(String.t()) :: NaiveDateTime.t()
|
||||
def get_last_crawl(domain) do
|
||||
Instance
|
||||
|> select([:last_crawl_timestamp])
|
||||
|> Repo.get_by!(domain: domain)
|
||||
|> Map.get(:last_crawl_timestamp)
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue