277 lines
14 KiB
Python
277 lines
14 KiB
Python
"""
|
|
This script starts at a seed instance and loads the list of connected
|
|
peers. From there, it scrapes the peers of all instances it finds,
|
|
gradually mapping the fediverse.
|
|
"""
|
|
import json
|
|
import multiprocessing as mp
|
|
import requests
|
|
import time
|
|
import os
|
|
from dateutil.parser import parse as datetime_parser
|
|
from datetime import datetime, timedelta, timezone
|
|
from functional import seq
|
|
from django_bulk_update.helper import bulk_update
|
|
from django.core.management.base import BaseCommand
|
|
from django import db
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
from scraper.models import Instance, PeerRelationship
|
|
from scraper.management.commands._util import require_lock, InvalidResponseException, get_key, log, validate_int, PersonalInstanceException, BlacklistedDomainException
|
|
|
|
# TODO: use the /api/v1/server/followers and /api/v1/server/following endpoints in peertube instances
|
|
|
|
SEED = 'mastodon.social'
|
|
TIMEOUT = 20 # seconds
|
|
NUM_THREADS = 16 # roughly 40MB each
|
|
PERSONAL_INSTANCE_THRESHOLD = 10 # instances with < this many users won't be crawled
|
|
MAX_STATUSES_PER_PAGE = 40
|
|
STATUS_SCRAPE_LIMIT = 5000
|
|
INSTANCE_SCRAPE_LIMIT = 50 # note: this does not include newly discovered instances! they will always be crawled.
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Scrapes the entire fediverse"
|
|
|
|
def add_arguments(self, parser):
|
|
# Named (optional) arguments
|
|
parser.add_argument(
|
|
'--unlimited',
|
|
action='store_true',
|
|
dest='unlimited',
|
|
help="Crawl all stale instances rather than limiting to {}".format(INSTANCE_SCRAPE_LIMIT),
|
|
)
|
|
parser.add_argument(
|
|
'--all',
|
|
action='store_true',
|
|
dest='all',
|
|
help="Crawl all instances rather than limiting to stale ones"
|
|
)
|
|
parser.add_argument(
|
|
'--verbose',
|
|
action='store_true',
|
|
dest='verbose',
|
|
help="Verbose logging"
|
|
)
|
|
parser.add_argument(
|
|
'--instance',
|
|
dest='instance',
|
|
help="Crawl a single instance"
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.verbose = False
|
|
self.scraped_count = 0
|
|
f = open(os.path.join(settings.BASE_DIR, '../whitelist.txt'), 'r')
|
|
self.whitelist = seq(f.readlines()).map(lambda i: i.lower().strip()).to_list()
|
|
f.close()
|
|
|
|
def get_instance_info(self, instance_name: str):
|
|
"""Collect info about instance"""
|
|
url = 'https://' + instance_name + '/api/v1/instance'
|
|
response = requests.get(url, timeout=TIMEOUT)
|
|
json = response.json()
|
|
if response.status_code != 200 or get_key(json, ['error']):
|
|
if self.verbose:
|
|
log(self, "Couldn't get instance info for {}: {}".format(instance_name, response), error=True)
|
|
raise InvalidResponseException("Could not get info for {}".format(instance_name))
|
|
return json
|
|
|
|
def get_instance_peers(self, instance_name: str):
|
|
"""Collect connected instances"""
|
|
# The peers endpoint returns a "list of all domain names known to this instance"
|
|
# (https://github.com/tootsuite/mastodon/pull/6125)
|
|
url = 'https://' + instance_name + '/api/v1/instance/peers'
|
|
response = requests.get(url, timeout=TIMEOUT)
|
|
peers = response.json()
|
|
if response.status_code != 200 or not isinstance(peers, list) or get_key(peers, ['error']):
|
|
if self.verbose:
|
|
log(self, "Couldn't get peers for {}: {}".format(instance_name, response), error=True)
|
|
raise InvalidResponseException("Could not get peers for {}".format(instance_name))
|
|
# Get rid of peers that just say "null" and the instance itself
|
|
# Also make sure to lowercase all instance names and remove duplicates
|
|
return list(set([peer.lower() for peer in peers if peer and peer != instance_name]))
|
|
|
|
def get_statuses(self, instance_name: str):
|
|
"""Collect all statuses that mention users on other instances"""
|
|
mentions = []
|
|
datetime_threshold = datetime.now(timezone.utc) - timedelta(days=31)
|
|
statuses_seen = 0
|
|
# We'll ask for lots of statuses, but Mastodon never returns more than 40. Some Pleroma instances will ignore
|
|
# the limit and return 20.
|
|
url = 'https://{}/api/v1/timelines/public?local=true&limit={}'.format(instance_name, MAX_STATUSES_PER_PAGE)
|
|
while True:
|
|
if self.verbose:
|
|
log(self, "({} posts seen)\tGetting {}".format(statuses_seen, url))
|
|
response = requests.get(url, timeout=TIMEOUT)
|
|
statuses = response.json()
|
|
if response.status_code != 200 or get_key(statuses, ['error']):
|
|
if self.verbose:
|
|
log(self, "Couldn't get statuses for {}: {}".format(instance_name, response), error=True)
|
|
raise InvalidResponseException("Could not get statuses for {}".format(instance_name))
|
|
elif len(statuses) == 0:
|
|
break
|
|
# Get mentions from this instance
|
|
mentions.extend((seq(statuses)
|
|
.filter(lambda s: datetime_parser(s['created_at']) > datetime_threshold)
|
|
.flat_map(lambda s: s['mentions']))) # map to mentions
|
|
|
|
# Find out if we should stop here
|
|
earliest_status = statuses[-1]
|
|
earliest_time_seen = datetime_parser(earliest_status['created_at'])
|
|
statuses_seen += len(statuses)
|
|
# Mastodon returns max 40 statuses; if we ever see less than that we know there aren't any more
|
|
if earliest_time_seen < datetime_threshold or statuses_seen >= STATUS_SCRAPE_LIMIT:
|
|
break
|
|
# Continuing, so get url for next page
|
|
min_id = earliest_status['id']
|
|
url = 'https://{}/api/v1/timelines/public?local=true&limit={}&max_id={}'.format(instance_name, MAX_STATUSES_PER_PAGE, min_id)
|
|
time.sleep(2) # Sleep to avoid overloading the instance
|
|
|
|
mentions_seq = (seq(mentions)
|
|
.filter(lambda m: not m['acct'].endswith(instance_name) and '@' in m['acct'])
|
|
.map(lambda m: m['acct'].split('@')[-1]) # map to instance name
|
|
.map(lambda m: (m, 1))
|
|
.reduce_by_key(lambda x, y: x+y)) # sequence of tuples (instance, count)
|
|
mentions_by_instance = {t[0]: t[1] for t in mentions_seq} # dict of instance -> number of mentions
|
|
|
|
return mentions_by_instance, statuses_seen
|
|
|
|
def process_instance(self, instance: Instance):
|
|
"""Given an instance, get all the data we're interested in"""
|
|
data = dict()
|
|
try:
|
|
if instance.name.endswith("gab.best"):
|
|
raise BlacklistedDomainException
|
|
|
|
data['instance_name'] = instance.name
|
|
data['info'] = self.get_instance_info(instance.name)
|
|
|
|
# Check if this is a personal instance before continuing
|
|
user_count = get_key(data, ['info', 'stats', 'user_count'])
|
|
if isinstance(user_count, int)\
|
|
and user_count < PERSONAL_INSTANCE_THRESHOLD\
|
|
and instance.name not in self.whitelist:
|
|
raise PersonalInstanceException
|
|
|
|
data['peers'] = self.get_instance_peers(instance.name)
|
|
if not data['info'] and not data['peers']:
|
|
# We got a response from the instance, but it didn't have any of the information we were expecting.
|
|
raise InvalidResponseException
|
|
|
|
data['mentions'], data['statuses_seen'] = self.get_statuses(instance.name)
|
|
data['status'] = 'success'
|
|
return data
|
|
|
|
except (InvalidResponseException,
|
|
PersonalInstanceException,
|
|
BlacklistedDomainException,
|
|
requests.exceptions.RequestException,
|
|
json.decoder.JSONDecodeError) as e:
|
|
data['instance_name'] = instance.name
|
|
data['status'] = type(e).__name__
|
|
return data
|
|
|
|
@db.transaction.atomic
|
|
@require_lock(Instance, 'ACCESS EXCLUSIVE')
|
|
def save_data(self, instance, data, queue, existing_instance_ids):
|
|
"""Save data"""
|
|
# Validate the ints. Some servers that appear to be fake instances have e.g. negative numbers here.
|
|
instance.domain_count = validate_int(get_key(data, ['info', 'stats', 'domain_count']))
|
|
instance.status_count = validate_int(get_key(data, ['info', 'stats', 'status_count']))
|
|
instance.user_count = validate_int(get_key(data, ['info', 'stats', 'user_count']))
|
|
instance.description = get_key(data, ['info', 'description'])
|
|
instance.version = get_key(data, ['info', 'version'])
|
|
instance.status = get_key(data, ['status'])
|
|
instance.last_updated = timezone.now()
|
|
instance.save()
|
|
if data['status'] == 'success' and data['peers']:
|
|
# TODO: handle a peer disappeer-ing
|
|
# Create instances for the peers we haven't seen before and add them to the queue
|
|
new_instance_ids = [peer_id for peer_id in data['peers'] if peer_id not in existing_instance_ids]
|
|
# bulk_create doesn't call save(), so the auto_now_add field won't get set automatically
|
|
new_instances = [Instance(name=id, first_seen=datetime.now(), last_updated=datetime.utcfromtimestamp(0))
|
|
for id in new_instance_ids]
|
|
existing_instance_ids.extend(new_instance_ids)
|
|
Instance.objects.bulk_create(new_instances)
|
|
for new_instance in new_instances:
|
|
queue.put(new_instance)
|
|
|
|
# Create relationships we haven't seen before
|
|
existing_peer_ids = PeerRelationship.objects.filter(source=instance).values_list('target', flat=True)
|
|
new_peer_ids = [peer_id for peer_id in data['peers'] if peer_id not in existing_peer_ids]
|
|
if new_peer_ids:
|
|
# new_peers = Instance.objects.filter(name__in=new_peer_ids)
|
|
new_relationships = [PeerRelationship(source=instance, target_id=new_peer, first_seen=datetime.now())
|
|
for new_peer in new_peer_ids]
|
|
PeerRelationship.objects.bulk_create(new_relationships)
|
|
|
|
if data['status'] == 'success' and data['mentions']:
|
|
# At this point, we can assume that a relationship exists for every peer that's mentioned in statuses
|
|
mentions = data['mentions']
|
|
relationships = PeerRelationship.objects.filter(source=instance,
|
|
target_id__in=list(mentions.keys()))
|
|
for relationship in relationships:
|
|
relationship.mention_count = mentions[relationship.target_id]
|
|
relationship.statuses_seen = data['statuses_seen']
|
|
relationship.last_updated = datetime.now()
|
|
bulk_update(relationships, update_fields=['mention_count', 'statuses_seen', 'last_updated'])
|
|
|
|
log(self, "Processed {}: {}".format(data['instance_name'], data['status']))
|
|
|
|
def worker(self, queue: mp.JoinableQueue, existing_instance_ids, scraped_ids):
|
|
"""The main worker that processes instances"""
|
|
db.connections.close_all() # https://stackoverflow.com/a/38356519/3697202
|
|
while True:
|
|
instance = queue.get()
|
|
if instance.name in scraped_ids:
|
|
# If we hit this branch, it's indicative of a bug
|
|
log(self, "Skipping {}, already done. This should not have been added to the queue!".format(instance),
|
|
error=True)
|
|
queue.task_done()
|
|
else:
|
|
# Fetch data on instance
|
|
log(self, "Processing {}".format(instance.name))
|
|
data = self.process_instance(instance)
|
|
self.save_data(instance, data, queue, existing_instance_ids)
|
|
scraped_ids[instance.name] = 1
|
|
queue.task_done()
|
|
|
|
def handle(self, *args, **options):
|
|
start_time = time.time()
|
|
|
|
self.verbose = options['verbose']
|
|
|
|
if options['instance']:
|
|
stale_instance, _ = Instance.objects.get_or_create(name=options['instance'])
|
|
stale_instances = [stale_instance]
|
|
elif options['all']:
|
|
stale_instances = Instance.objects.all()
|
|
else:
|
|
stale_instances = Instance.objects.filter(last_updated__lte=datetime.now()-timedelta(days=1))
|
|
|
|
if not options['unlimited']:
|
|
stale_instances = stale_instances[:INSTANCE_SCRAPE_LIMIT]
|
|
|
|
with mp.Manager() as manager:
|
|
# Share the list of existing instances amongst all threads (to avoid each thread having to query
|
|
# for it on every instance it scrapes)
|
|
existing_instance_ids = manager.list(list(Instance.objects.values_list('name', flat=True)))
|
|
scraped_ids = manager.dict()
|
|
queue = mp.JoinableQueue()
|
|
if stale_instances:
|
|
for instance in stale_instances:
|
|
queue.put(instance)
|
|
elif not Instance.objects.exists():
|
|
instance, _ = Instance.objects.get_or_create(name=SEED)
|
|
existing_instance_ids.append(instance.name)
|
|
queue.put(instance)
|
|
|
|
pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids, scraped_ids))
|
|
queue.join()
|
|
self.scraped_count = len(scraped_ids.keys())
|
|
|
|
end_time = time.time()
|
|
log(self, "Scraped {} instances in {:.0f}s".format(self.scraped_count, end_time - start_time), True)
|