diff --git a/apiv1/views.py b/apiv1/views.py index 1bbd6fd..128a755 100644 --- a/apiv1/views.py +++ b/apiv1/views.py @@ -1,5 +1,4 @@ from rest_framework import viewsets -from django.db.models import Prefetch from scraper.models import Instance, PeerRelationship from apiv1.serializers import InstanceListSerializer, InstanceDetailSerializer, NodeSerializer, EdgeSerializer @@ -25,7 +24,7 @@ class EdgeView(viewsets.ReadOnlyModelViewSet): """ Endpoint to get a list of the graph's edges in a SigmaJS-friendly format. """ - queryset = PeerRelationship.objects.all()[:1000] + queryset = PeerRelationship.objects.filter(source__status='success', target__status='success') serializer_class = EdgeSerializer diff --git a/requirements.txt b/requirements.txt index f393611..c2acce3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,19 +1,26 @@ autopep8==1.3.5 certifi==2018.8.24 chardet==3.0.4 +dill==0.2.5 Django==2.1 +django-bulk-update==2.2.0 +django-cors-headers==2.4.0 +django-letsencrypt==3.0.1 django-silk==3.0.1 djangorestframework==3.8.2 +future==0.16.0 gprof2dot==2016.10.13 idna==2.7 Jinja2==2.10 MarkupSafe==1.0 psycopg2-binary==2.7.5 pycodestyle==2.4.0 +PyFunctional==1.1.3 Pygments==2.2.0 python-dateutil==2.7.3 pytz==2018.5 requests==2.19.1 -six==1.11.0 +six==1.10.0 sqlparse==0.2.4 +tabulate==0.7.7 urllib3==1.23 diff --git a/scraper/management/commands/_util.py b/scraper/management/commands/_util.py index 880cb3f..80c1871 100644 --- a/scraper/management/commands/_util.py +++ b/scraper/management/commands/_util.py @@ -53,7 +53,7 @@ def get_key(data, keys: list): while keys: val = val[keys.pop(0)] return val - except KeyError: + except (KeyError, TypeError): return '' diff --git a/scraper/management/commands/scrape.py b/scraper/management/commands/scrape.py index 40979c3..9c07ff7 100644 --- a/scraper/management/commands/scrape.py +++ b/scraper/management/commands/scrape.py @@ -7,7 +7,10 @@ import json import multiprocessing as mp import requests import time -from datetime import datetime, timedelta +from dateutil.parser import parse as datetime_parser +from datetime import datetime, timedelta, timezone +from functional import seq +from django_bulk_update.helper import bulk_update from django.core.management.base import BaseCommand from django import db from scraper.models import Instance, PeerRelationship @@ -28,7 +31,7 @@ from scraper.management.commands._util import require_lock, InvalidResponseError SEED = 'mastodon.social' TIMEOUT = 10 -NUM_THREADS = 4 +NUM_THREADS = 64 class Command(BaseCommand): @@ -36,16 +39,17 @@ class Command(BaseCommand): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.done_bag = set() + self.scraped_ids = set() @staticmethod def get_instance_info(instance_name: str): """Collect info about instance""" url = 'https://' + instance_name + '/api/v1/instance' response = requests.get(url, timeout=TIMEOUT) - if response.status_code != 200: + json = response.json() + if response.status_code != 200 or get_key(json, ['error']): raise InvalidResponseError("Could not get info for {}".format(instance_name)) - return response.json() + return json @staticmethod def get_instance_peers(instance_name: str): @@ -54,10 +58,52 @@ class Command(BaseCommand): # (https://github.com/tootsuite/mastodon/pull/6125) url = 'https://' + instance_name + '/api/v1/instance/peers' response = requests.get(url, timeout=TIMEOUT) - json = response.json() - if response.status_code != 200 or not isinstance(json, list): + peers = response.json() + if response.status_code != 200 or not isinstance(peers, list) or get_key(peers, ['error']): raise InvalidResponseError("Could not get peers for {}".format(instance_name)) - return json + # Get rid of peers that just say "null" and the instance itself + return [peer for peer in peers if peer and peer != instance_name] + + @staticmethod + def get_statuses(instance_name: str): + """Collect all statuses that mention users on other instances""" + mentions = [] + datetime_threshold = datetime.now(timezone.utc) - timedelta(weeks=1) + statuses_seen = 0 + # We'll ask for 1000 statuses, but Mastodon never returns more than 40. Some Pleroma instances will ignore + # the limit and return 20. + url = 'https://' + instance_name + '/api/v1/timelines/public?local=true&limit=1000' + while True: + response = requests.get(url, timeout=TIMEOUT) + statuses = response.json() + if response.status_code != 200 or get_key(statuses, ['error']): + raise InvalidResponseError("Could not get statuses for {}".format(instance_name)) + elif len(statuses) == 0: + break + # Get mentions from this instance + mentions.extend((seq(statuses) + .filter(lambda s: datetime_parser(s['created_at']) > datetime_threshold) + .flat_map(lambda s: s['mentions']))) # map to mentions + + # Find out if we should stop here + earliest_status = statuses[-1] + earliest_time_seen = datetime_parser(earliest_status['created_at']) + statuses_seen += len(statuses) + # Mastodon returns max 40 statuses; if we ever see less than that we know there aren't any more + if earliest_time_seen < datetime_threshold or statuses_seen >= 2000: + break + # Continuing, so get url for next page + min_id = earliest_status['id'] + url = 'https://' + instance_name + '/api/v1/timelines/public?local=true&limit=1000&max_id=' + min_id + + mentions_seq = (seq(mentions) + .filter(lambda m: not m['acct'].endswith(instance_name) and '@' in m['acct']) + .map(lambda m: m['acct'].split('@')[-1]) # map to instance name + .map(lambda m: (m, 1)) + .reduce_by_key(lambda x, y: x+y)) # sequence of tuples (instance, count) + mentions_by_instance = {t[0]: t[1] for t in mentions_seq} # dict of instance -> number of mentions + + return mentions_by_instance, statuses_seen def process_instance(self, instance: Instance): """Given an instance, get all the data we're interested in""" @@ -65,11 +111,11 @@ class Command(BaseCommand): try: data['instance_name'] = instance.name data['info'] = self.get_instance_info(instance.name) - # Get rid of peers that just say "null" and the instance itself - data['peers'] = [peer for peer in self.get_instance_peers(instance.name) if peer and peer != instance.name] + data['peers'] = self.get_instance_peers(instance.name) if not data['info'] and not data['peers']: # We got a response from the instance, but it didn't have any of the information we were expecting. raise InvalidResponseError + data['mentions'], data['statuses_seen'] = self.get_statuses(instance.name) data['status'] = 'success' return data except (InvalidResponseError, @@ -98,9 +144,7 @@ class Command(BaseCommand): # bulk_create doesn't call save(), so the auto_now_add field won't get set automatically new_instances = [Instance(name=id, first_seen=datetime.now(), last_updated=datetime.now()) for id in new_instance_ids] - print("Before: {}".format(len(existing_instance_ids))) existing_instance_ids.extend(new_instance_ids) - print("After: {}".format(len(existing_instance_ids))) Instance.objects.bulk_create(new_instances) for new_instance in new_instances: queue.put(new_instance) @@ -113,6 +157,18 @@ class Command(BaseCommand): new_relationships = [PeerRelationship(source=instance, target_id=new_peer, first_seen=datetime.now()) for new_peer in new_peer_ids] PeerRelationship.objects.bulk_create(new_relationships) + + if data['status'] == 'success' and data['mentions']: + # At this point, we can assume that a relationship exists for every peer that's mentioned in statuses + mentions = data['mentions'] + relationships = PeerRelationship.objects.filter(source=instance, + target_id__in=list(mentions.keys())) + for relationship in relationships: + relationship.mention_count = mentions[relationship.target_id] + relationship.statuses_seen = data['statuses_seen'] + relationship.last_updated = datetime.now() + bulk_update(relationships, update_fields=['mention_count', 'statuses_seen', 'last_updated']) + self.stdout.write(log("Saved {}".format(data['instance_name']))) def worker(self, queue: mp.JoinableQueue, existing_instance_ids): @@ -121,33 +177,37 @@ class Command(BaseCommand): db.connections.close_all() while True: instance = queue.get() - if instance in self.done_bag: - self.stderr.write(log("Skipping {}, already done. This should not have been added to the queue!".format(instance))) + if instance.name in self.scraped_ids: + self.stderr.write(log("Skipping {}, already done. This should not have been added to the queue!" + .format(instance))) queue.task_done() else: # Fetch data on instance self.stdout.write(log("Processing {}".format(instance.name))) data = self.process_instance(instance) self.save_data(instance, data, queue, existing_instance_ids) - self.done_bag.add(instance) + self.scraped_ids.add(instance.name) queue.task_done() def handle(self, *args, **options): start_time = time.time() - stale_instances = Instance.objects.filter(last_updated__lte=datetime.now()-timedelta(weeks=1)) + stale_instances = Instance.objects.filter(last_updated__lte=datetime.now()-timedelta(days=1)) with mp.Manager() as manager: # Share the list of existing instances amongst all threads (to avoid each thread having to query # for it on every instance it scrapes) existing_instance_ids = manager.list(list(Instance.objects.values_list('name', flat=True))) queue = mp.JoinableQueue() if stale_instances: - queue.put(list(stale_instances)) + for instance in stale_instances: + queue.put(instance) elif not Instance.objects.exists(): instance, _ = Instance.objects.get_or_create(name=SEED) - queue.put(instance) existing_instance_ids.append(instance.name) + queue.put(instance) pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids)) queue.join() + end_time = time.time() - self.stdout.write(self.style.SUCCESS(log("Successfully scraped the fediverse in {:.0f}s".format(end_time-start_time)))) + self.stdout.write(self.style.SUCCESS(log("Scraped {} instances in {:.0f}s" + .format(len(self.scraped_ids), end_time - start_time)))) diff --git a/scraper/migrations/0001_initial.py b/scraper/migrations/0001_initial.py index d1daaac..04fdea1 100644 --- a/scraper/migrations/0001_initial.py +++ b/scraper/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 2.1 on 2018-09-01 14:00 +# Generated by Django 2.1 on 2018-09-01 22:28 from django.db import migrations, models import django.db.models.deletion @@ -30,7 +30,10 @@ class Migration(migrations.Migration): name='PeerRelationship', fields=[ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('mention_count', models.IntegerField(blank=True, null=True)), + ('statuses_seen', models.IntegerField(blank=True, null=True)), ('first_seen', models.DateTimeField(auto_now_add=True)), + ('last_updated', models.DateTimeField(auto_now=True)), ('source', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='following_relationship', to='scraper.Instance')), ('target', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='follower_relationships', to='scraper.Instance')), ], diff --git a/scraper/models.py b/scraper/models.py index 9c1649a..004bce0 100644 --- a/scraper/models.py +++ b/scraper/models.py @@ -31,5 +31,10 @@ class PeerRelationship(models.Model): source = models.ForeignKey(Instance, related_name="following_relationship", on_delete=models.CASCADE) target = models.ForeignKey(Instance, related_name="follower_relationships", on_delete=models.CASCADE) + # Interaction stats + mention_count = models.IntegerField(blank=True, null=True) + statuses_seen = models.IntegerField(blank=True, null=True) # in case we want mention_count as a ratio + # Metadata first_seen = models.DateTimeField(auto_now_add=True) + last_updated = models.DateTimeField(auto_now=True)