index.community/backend/scraper/management/commands/scrape.py

273 lines
13 KiB
Python
Raw Normal View History

2018-08-26 01:17:10 +00:00
"""
This script starts at a seed instance and loads the list of connected
2018-08-26 18:15:50 +00:00
peers. From there, it scrapes the peers of all instances it finds,
2018-08-26 01:17:10 +00:00
gradually mapping the fediverse.
"""
import json
2018-09-01 18:46:00 +00:00
import multiprocessing as mp
2018-08-26 01:17:10 +00:00
import requests
2018-08-26 18:15:50 +00:00
import time
import os
from dateutil.parser import parse as datetime_parser
from datetime import datetime, timedelta, timezone
from functional import seq
from django_bulk_update.helper import bulk_update
2018-08-26 01:17:10 +00:00
from django.core.management.base import BaseCommand
from django import db
from django.conf import settings
2019-02-21 10:38:49 +00:00
from django.utils import timezone
from scraper.models import Instance, PeerRelationship
2018-09-02 22:36:03 +00:00
from scraper.management.commands._util import require_lock, InvalidResponseException, get_key, log, validate_int, PersonalInstanceException
2018-08-26 01:17:10 +00:00
2018-08-26 18:15:50 +00:00
# TODO: use the /api/v1/server/followers and /api/v1/server/following endpoints in peertube instances
SEED = 'p.a3.pm'
2018-09-03 11:36:14 +00:00
TIMEOUT = 20 # seconds
2018-09-03 20:34:25 +00:00
NUM_THREADS = 16 # roughly 40MB each
2019-02-28 18:06:16 +00:00
PERSONAL_INSTANCE_THRESHOLD = 10 # instances with < this many users won't be crawled
2019-03-08 16:05:42 +00:00
MAX_STATUSES_PER_PAGE = 40
2019-02-21 12:35:53 +00:00
STATUS_SCRAPE_LIMIT = 5000
2019-02-28 18:03:08 +00:00
INSTANCE_SCRAPE_LIMIT = 50 # note: this does not include newly discovered instances! they will always be crawled.
2018-08-26 01:17:10 +00:00
class Command(BaseCommand):
help = "Scrapes the entire fediverse"
2019-02-28 18:03:08 +00:00
def add_arguments(self, parser):
# Named (optional) arguments
parser.add_argument(
'--unlimited',
action='store_true',
dest='unlimited',
help="Crawl all stale instances rather than limiting to {}".format(INSTANCE_SCRAPE_LIMIT),
)
parser.add_argument(
'--all',
action='store_true',
dest='all',
help="Crawl all instances rather than limiting to stale ones"
)
parser.add_argument(
'--verbose',
action='store_true',
dest='verbose',
help="Verbose logging"
)
parser.add_argument(
'--instance',
dest='instance',
help="Crawl a single instance"
)
2019-02-28 18:03:08 +00:00
2018-08-26 18:15:50 +00:00
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.verbose = False
2018-09-02 22:36:03 +00:00
self.scraped_count = 0
f = open(os.path.join(settings.BASE_DIR, '../whitelist.txt'), 'r')
self.whitelist = seq(f.readlines()).map(lambda i: i.lower().strip()).to_list()
f.close()
2018-08-26 18:15:50 +00:00
def get_instance_info(self, instance_name: str):
2018-08-26 18:15:50 +00:00
"""Collect info about instance"""
url = 'https://' + instance_name + '/api/v1/instance'
response = requests.get(url, timeout=TIMEOUT)
json = response.json()
if response.status_code != 200 or get_key(json, ['error']):
if self.verbose:
log(self, "Couldn't get instance info for {}: {}".format(instance_name, response), error=True)
2018-09-02 22:36:03 +00:00
raise InvalidResponseException("Could not get info for {}".format(instance_name))
return json
2018-08-26 18:15:50 +00:00
def get_instance_peers(self, instance_name: str):
2018-08-26 18:15:50 +00:00
"""Collect connected instances"""
# The peers endpoint returns a "list of all domain names known to this instance"
# (https://github.com/tootsuite/mastodon/pull/6125)
2018-08-26 18:15:50 +00:00
url = 'https://' + instance_name + '/api/v1/instance/peers'
response = requests.get(url, timeout=TIMEOUT)
peers = response.json()
if response.status_code != 200 or not isinstance(peers, list) or get_key(peers, ['error']):
if self.verbose:
log(self, "Couldn't get peers for {}: {}".format(instance_name, response), error=True)
2018-09-02 22:36:03 +00:00
raise InvalidResponseException("Could not get peers for {}".format(instance_name))
# Get rid of peers that just say "null" and the instance itself
2019-03-01 14:42:05 +00:00
# Also make sure to lowercase all instance names and remove duplicates
return list(set([peer.lower() for peer in peers if peer and peer != instance_name]))
def get_statuses(self, instance_name: str):
"""Collect all statuses that mention users on other instances"""
mentions = []
2018-09-03 14:10:44 +00:00
datetime_threshold = datetime.now(timezone.utc) - timedelta(days=31)
statuses_seen = 0
2019-02-28 18:03:08 +00:00
# We'll ask for lots of statuses, but Mastodon never returns more than 40. Some Pleroma instances will ignore
# the limit and return 20.
url = 'https://{}/api/v1/timelines/public?local=true&limit={}'.format(instance_name, MAX_STATUSES_PER_PAGE)
while True:
if self.verbose:
log(self, "({} posts seen)\tGetting {}".format(statuses_seen, url))
response = requests.get(url, timeout=TIMEOUT)
statuses = response.json()
if response.status_code != 200 or get_key(statuses, ['error']):
if self.verbose:
log(self, "Couldn't get statuses for {}: {}".format(instance_name, response), error=True)
2018-09-02 22:36:03 +00:00
raise InvalidResponseException("Could not get statuses for {}".format(instance_name))
elif len(statuses) == 0:
break
# Get mentions from this instance
mentions.extend((seq(statuses)
.filter(lambda s: datetime_parser(s['created_at']) > datetime_threshold)
.flat_map(lambda s: s['mentions']))) # map to mentions
# Find out if we should stop here
earliest_status = statuses[-1]
earliest_time_seen = datetime_parser(earliest_status['created_at'])
statuses_seen += len(statuses)
# Mastodon returns max 40 statuses; if we ever see less than that we know there aren't any more
2018-09-03 14:10:44 +00:00
if earliest_time_seen < datetime_threshold or statuses_seen >= STATUS_SCRAPE_LIMIT:
break
# Continuing, so get url for next page
min_id = earliest_status['id']
2019-02-28 18:03:08 +00:00
url = 'https://{}/api/v1/timelines/public?local=true&limit={}&max_id={}'.format(instance_name, MAX_STATUSES_PER_PAGE, min_id)
2018-09-05 09:42:14 +00:00
time.sleep(2) # Sleep to avoid overloading the instance
mentions_seq = (seq(mentions)
.filter(lambda m: not m['acct'].endswith(instance_name) and '@' in m['acct'])
.map(lambda m: m['acct'].split('@')[-1]) # map to instance name
.map(lambda m: (m, 1))
.reduce_by_key(lambda x, y: x+y)) # sequence of tuples (instance, count)
mentions_by_instance = {t[0]: t[1] for t in mentions_seq} # dict of instance -> number of mentions
return mentions_by_instance, statuses_seen
2018-08-26 18:15:50 +00:00
def process_instance(self, instance: Instance):
2018-08-26 18:15:50 +00:00
"""Given an instance, get all the data we're interested in"""
data = dict()
try:
data['instance_name'] = instance.name
data['info'] = self.get_instance_info(instance.name)
2018-09-02 22:36:03 +00:00
# Check if this is a personal instance before continuing
user_count = get_key(data, ['info', 'stats', 'user_count'])
if isinstance(user_count, int)\
and user_count < PERSONAL_INSTANCE_THRESHOLD\
and instance.name not in self.whitelist:
2018-09-02 22:36:03 +00:00
raise PersonalInstanceException
data['peers'] = self.get_instance_peers(instance.name)
if not data['info'] and not data['peers']:
# We got a response from the instance, but it didn't have any of the information we were expecting.
2018-09-02 22:36:03 +00:00
raise InvalidResponseException
data['mentions'], data['statuses_seen'] = self.get_statuses(instance.name)
2018-08-26 18:15:50 +00:00
data['status'] = 'success'
return data
2018-09-02 22:36:03 +00:00
except (InvalidResponseException,
PersonalInstanceException,
2018-08-26 18:15:50 +00:00
requests.exceptions.RequestException,
json.decoder.JSONDecodeError) as e:
data['instance_name'] = instance.name
2018-08-26 18:15:50 +00:00
data['status'] = type(e).__name__
return data
@db.transaction.atomic
2018-08-26 22:12:24 +00:00
@require_lock(Instance, 'ACCESS EXCLUSIVE')
2018-09-01 18:46:00 +00:00
def save_data(self, instance, data, queue, existing_instance_ids):
2018-08-26 18:15:50 +00:00
"""Save data"""
# Validate the ints. Some servers that appear to be fake instances have e.g. negative numbers here.
instance.domain_count = validate_int(get_key(data, ['info', 'stats', 'domain_count']))
instance.status_count = validate_int(get_key(data, ['info', 'stats', 'status_count']))
instance.user_count = validate_int(get_key(data, ['info', 'stats', 'user_count']))
instance.description = get_key(data, ['info', 'description'])
instance.version = get_key(data, ['info', 'version'])
instance.status = get_key(data, ['status'])
2019-02-21 10:38:49 +00:00
instance.last_updated = timezone.now()
instance.save()
if data['status'] == 'success' and data['peers']:
# TODO: handle a peer disappeer-ing
# Create instances for the peers we haven't seen before and add them to the queue
new_instance_ids = [peer_id for peer_id in data['peers'] if peer_id not in existing_instance_ids]
# bulk_create doesn't call save(), so the auto_now_add field won't get set automatically
2019-02-20 22:09:21 +00:00
new_instances = [Instance(name=id, first_seen=datetime.now(), last_updated=datetime.utcfromtimestamp(0))
for id in new_instance_ids]
2018-09-01 18:46:00 +00:00
existing_instance_ids.extend(new_instance_ids)
Instance.objects.bulk_create(new_instances)
for new_instance in new_instances:
queue.put(new_instance)
# Create relationships we haven't seen before
existing_peer_ids = PeerRelationship.objects.filter(source=instance).values_list('target', flat=True)
new_peer_ids = [peer_id for peer_id in data['peers'] if peer_id not in existing_peer_ids]
2018-08-26 18:15:50 +00:00
if new_peer_ids:
2018-09-01 18:46:00 +00:00
# new_peers = Instance.objects.filter(name__in=new_peer_ids)
new_relationships = [PeerRelationship(source=instance, target_id=new_peer, first_seen=datetime.now())
for new_peer in new_peer_ids]
PeerRelationship.objects.bulk_create(new_relationships)
if data['status'] == 'success' and data['mentions']:
# At this point, we can assume that a relationship exists for every peer that's mentioned in statuses
mentions = data['mentions']
relationships = PeerRelationship.objects.filter(source=instance,
target_id__in=list(mentions.keys()))
for relationship in relationships:
relationship.mention_count = mentions[relationship.target_id]
relationship.statuses_seen = data['statuses_seen']
relationship.last_updated = datetime.now()
bulk_update(relationships, update_fields=['mention_count', 'statuses_seen', 'last_updated'])
log(self, "Processed {}: {}".format(data['instance_name'], data['status']))
2018-08-26 18:15:50 +00:00
2018-09-02 22:36:03 +00:00
def worker(self, queue: mp.JoinableQueue, existing_instance_ids, scraped_ids):
2019-02-20 22:09:21 +00:00
"""The main worker that processes instances"""
db.connections.close_all() # https://stackoverflow.com/a/38356519/3697202
2018-08-26 18:15:50 +00:00
while True:
instance = queue.get()
2018-09-02 22:36:03 +00:00
if instance.name in scraped_ids:
2019-02-20 22:09:21 +00:00
# If we hit this branch, it's indicative of a bug
log(self, "Skipping {}, already done. This should not have been added to the queue!".format(instance),
error=True)
2018-08-26 18:15:50 +00:00
queue.task_done()
else:
# Fetch data on instance
log(self, "Processing {}".format(instance.name))
2018-08-26 18:15:50 +00:00
data = self.process_instance(instance)
2018-09-01 18:46:00 +00:00
self.save_data(instance, data, queue, existing_instance_ids)
2018-09-02 22:36:03 +00:00
scraped_ids[instance.name] = 1
2018-08-26 18:15:50 +00:00
queue.task_done()
2018-08-26 01:17:10 +00:00
def handle(self, *args, **options):
2018-08-26 18:15:50 +00:00
start_time = time.time()
self.verbose = options['verbose']
if options['instance']:
stale_instance, _ = Instance.objects.get_or_create(name=options['instance'])
stale_instances = [stale_instance]
elif options['all']:
2019-02-28 18:03:08 +00:00
stale_instances = Instance.objects.all()
else:
stale_instances = Instance.objects.filter(last_updated__lte=datetime.now()-timedelta(days=1))
if not options['unlimited']:
stale_instances = stale_instances[:INSTANCE_SCRAPE_LIMIT]
2018-09-01 18:46:00 +00:00
with mp.Manager() as manager:
# Share the list of existing instances amongst all threads (to avoid each thread having to query
# for it on every instance it scrapes)
existing_instance_ids = manager.list(list(Instance.objects.values_list('name', flat=True)))
2018-09-02 22:36:03 +00:00
scraped_ids = manager.dict()
2018-09-01 18:46:00 +00:00
queue = mp.JoinableQueue()
if stale_instances:
for instance in stale_instances:
queue.put(instance)
2018-09-01 18:46:00 +00:00
elif not Instance.objects.exists():
instance, _ = Instance.objects.get_or_create(name=SEED)
existing_instance_ids.append(instance.name)
queue.put(instance)
2018-09-02 22:36:03 +00:00
pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids, scraped_ids))
2018-09-01 18:46:00 +00:00
queue.join()
2018-09-02 22:36:03 +00:00
self.scraped_count = len(scraped_ids.keys())
2018-08-26 18:15:50 +00:00
end_time = time.time()
log(self, "Scraped {} instances in {:.0f}s".format(self.scraped_count, end_time - start_time), True)