scrape recent statuses for mentions of other instances

This commit is contained in:
Tao Bojlen 2018-09-03 00:04:03 +02:00
parent d383bca0d8
commit a176e35ec2
6 changed files with 98 additions and 24 deletions

View file

@ -1,5 +1,4 @@
from rest_framework import viewsets from rest_framework import viewsets
from django.db.models import Prefetch
from scraper.models import Instance, PeerRelationship from scraper.models import Instance, PeerRelationship
from apiv1.serializers import InstanceListSerializer, InstanceDetailSerializer, NodeSerializer, EdgeSerializer from apiv1.serializers import InstanceListSerializer, InstanceDetailSerializer, NodeSerializer, EdgeSerializer
@ -25,7 +24,7 @@ class EdgeView(viewsets.ReadOnlyModelViewSet):
""" """
Endpoint to get a list of the graph's edges in a SigmaJS-friendly format. Endpoint to get a list of the graph's edges in a SigmaJS-friendly format.
""" """
queryset = PeerRelationship.objects.all()[:1000] queryset = PeerRelationship.objects.filter(source__status='success', target__status='success')
serializer_class = EdgeSerializer serializer_class = EdgeSerializer

View file

@ -1,19 +1,26 @@
autopep8==1.3.5 autopep8==1.3.5
certifi==2018.8.24 certifi==2018.8.24
chardet==3.0.4 chardet==3.0.4
dill==0.2.5
Django==2.1 Django==2.1
django-bulk-update==2.2.0
django-cors-headers==2.4.0
django-letsencrypt==3.0.1
django-silk==3.0.1 django-silk==3.0.1
djangorestframework==3.8.2 djangorestframework==3.8.2
future==0.16.0
gprof2dot==2016.10.13 gprof2dot==2016.10.13
idna==2.7 idna==2.7
Jinja2==2.10 Jinja2==2.10
MarkupSafe==1.0 MarkupSafe==1.0
psycopg2-binary==2.7.5 psycopg2-binary==2.7.5
pycodestyle==2.4.0 pycodestyle==2.4.0
PyFunctional==1.1.3
Pygments==2.2.0 Pygments==2.2.0
python-dateutil==2.7.3 python-dateutil==2.7.3
pytz==2018.5 pytz==2018.5
requests==2.19.1 requests==2.19.1
six==1.11.0 six==1.10.0
sqlparse==0.2.4 sqlparse==0.2.4
tabulate==0.7.7
urllib3==1.23 urllib3==1.23

View file

@ -53,7 +53,7 @@ def get_key(data, keys: list):
while keys: while keys:
val = val[keys.pop(0)] val = val[keys.pop(0)]
return val return val
except KeyError: except (KeyError, TypeError):
return '' return ''

View file

@ -7,7 +7,10 @@ import json
import multiprocessing as mp import multiprocessing as mp
import requests import requests
import time import time
from datetime import datetime, timedelta from dateutil.parser import parse as datetime_parser
from datetime import datetime, timedelta, timezone
from functional import seq
from django_bulk_update.helper import bulk_update
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django import db from django import db
from scraper.models import Instance, PeerRelationship from scraper.models import Instance, PeerRelationship
@ -28,7 +31,7 @@ from scraper.management.commands._util import require_lock, InvalidResponseError
SEED = 'mastodon.social' SEED = 'mastodon.social'
TIMEOUT = 10 TIMEOUT = 10
NUM_THREADS = 4 NUM_THREADS = 64
class Command(BaseCommand): class Command(BaseCommand):
@ -36,16 +39,17 @@ class Command(BaseCommand):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.done_bag = set() self.scraped_ids = set()
@staticmethod @staticmethod
def get_instance_info(instance_name: str): def get_instance_info(instance_name: str):
"""Collect info about instance""" """Collect info about instance"""
url = 'https://' + instance_name + '/api/v1/instance' url = 'https://' + instance_name + '/api/v1/instance'
response = requests.get(url, timeout=TIMEOUT) response = requests.get(url, timeout=TIMEOUT)
if response.status_code != 200: json = response.json()
if response.status_code != 200 or get_key(json, ['error']):
raise InvalidResponseError("Could not get info for {}".format(instance_name)) raise InvalidResponseError("Could not get info for {}".format(instance_name))
return response.json() return json
@staticmethod @staticmethod
def get_instance_peers(instance_name: str): def get_instance_peers(instance_name: str):
@ -54,10 +58,52 @@ class Command(BaseCommand):
# (https://github.com/tootsuite/mastodon/pull/6125) # (https://github.com/tootsuite/mastodon/pull/6125)
url = 'https://' + instance_name + '/api/v1/instance/peers' url = 'https://' + instance_name + '/api/v1/instance/peers'
response = requests.get(url, timeout=TIMEOUT) response = requests.get(url, timeout=TIMEOUT)
json = response.json() peers = response.json()
if response.status_code != 200 or not isinstance(json, list): if response.status_code != 200 or not isinstance(peers, list) or get_key(peers, ['error']):
raise InvalidResponseError("Could not get peers for {}".format(instance_name)) raise InvalidResponseError("Could not get peers for {}".format(instance_name))
return json # Get rid of peers that just say "null" and the instance itself
return [peer for peer in peers if peer and peer != instance_name]
@staticmethod
def get_statuses(instance_name: str):
"""Collect all statuses that mention users on other instances"""
mentions = []
datetime_threshold = datetime.now(timezone.utc) - timedelta(weeks=1)
statuses_seen = 0
# We'll ask for 1000 statuses, but Mastodon never returns more than 40. Some Pleroma instances will ignore
# the limit and return 20.
url = 'https://' + instance_name + '/api/v1/timelines/public?local=true&limit=1000'
while True:
response = requests.get(url, timeout=TIMEOUT)
statuses = response.json()
if response.status_code != 200 or get_key(statuses, ['error']):
raise InvalidResponseError("Could not get statuses for {}".format(instance_name))
elif len(statuses) == 0:
break
# Get mentions from this instance
mentions.extend((seq(statuses)
.filter(lambda s: datetime_parser(s['created_at']) > datetime_threshold)
.flat_map(lambda s: s['mentions']))) # map to mentions
# Find out if we should stop here
earliest_status = statuses[-1]
earliest_time_seen = datetime_parser(earliest_status['created_at'])
statuses_seen += len(statuses)
# Mastodon returns max 40 statuses; if we ever see less than that we know there aren't any more
if earliest_time_seen < datetime_threshold or statuses_seen >= 2000:
break
# Continuing, so get url for next page
min_id = earliest_status['id']
url = 'https://' + instance_name + '/api/v1/timelines/public?local=true&limit=1000&max_id=' + min_id
mentions_seq = (seq(mentions)
.filter(lambda m: not m['acct'].endswith(instance_name) and '@' in m['acct'])
.map(lambda m: m['acct'].split('@')[-1]) # map to instance name
.map(lambda m: (m, 1))
.reduce_by_key(lambda x, y: x+y)) # sequence of tuples (instance, count)
mentions_by_instance = {t[0]: t[1] for t in mentions_seq} # dict of instance -> number of mentions
return mentions_by_instance, statuses_seen
def process_instance(self, instance: Instance): def process_instance(self, instance: Instance):
"""Given an instance, get all the data we're interested in""" """Given an instance, get all the data we're interested in"""
@ -65,11 +111,11 @@ class Command(BaseCommand):
try: try:
data['instance_name'] = instance.name data['instance_name'] = instance.name
data['info'] = self.get_instance_info(instance.name) data['info'] = self.get_instance_info(instance.name)
# Get rid of peers that just say "null" and the instance itself data['peers'] = self.get_instance_peers(instance.name)
data['peers'] = [peer for peer in self.get_instance_peers(instance.name) if peer and peer != instance.name]
if not data['info'] and not data['peers']: if not data['info'] and not data['peers']:
# We got a response from the instance, but it didn't have any of the information we were expecting. # We got a response from the instance, but it didn't have any of the information we were expecting.
raise InvalidResponseError raise InvalidResponseError
data['mentions'], data['statuses_seen'] = self.get_statuses(instance.name)
data['status'] = 'success' data['status'] = 'success'
return data return data
except (InvalidResponseError, except (InvalidResponseError,
@ -98,9 +144,7 @@ class Command(BaseCommand):
# bulk_create doesn't call save(), so the auto_now_add field won't get set automatically # bulk_create doesn't call save(), so the auto_now_add field won't get set automatically
new_instances = [Instance(name=id, first_seen=datetime.now(), last_updated=datetime.now()) new_instances = [Instance(name=id, first_seen=datetime.now(), last_updated=datetime.now())
for id in new_instance_ids] for id in new_instance_ids]
print("Before: {}".format(len(existing_instance_ids)))
existing_instance_ids.extend(new_instance_ids) existing_instance_ids.extend(new_instance_ids)
print("After: {}".format(len(existing_instance_ids)))
Instance.objects.bulk_create(new_instances) Instance.objects.bulk_create(new_instances)
for new_instance in new_instances: for new_instance in new_instances:
queue.put(new_instance) queue.put(new_instance)
@ -113,6 +157,18 @@ class Command(BaseCommand):
new_relationships = [PeerRelationship(source=instance, target_id=new_peer, first_seen=datetime.now()) new_relationships = [PeerRelationship(source=instance, target_id=new_peer, first_seen=datetime.now())
for new_peer in new_peer_ids] for new_peer in new_peer_ids]
PeerRelationship.objects.bulk_create(new_relationships) PeerRelationship.objects.bulk_create(new_relationships)
if data['status'] == 'success' and data['mentions']:
# At this point, we can assume that a relationship exists for every peer that's mentioned in statuses
mentions = data['mentions']
relationships = PeerRelationship.objects.filter(source=instance,
target_id__in=list(mentions.keys()))
for relationship in relationships:
relationship.mention_count = mentions[relationship.target_id]
relationship.statuses_seen = data['statuses_seen']
relationship.last_updated = datetime.now()
bulk_update(relationships, update_fields=['mention_count', 'statuses_seen', 'last_updated'])
self.stdout.write(log("Saved {}".format(data['instance_name']))) self.stdout.write(log("Saved {}".format(data['instance_name'])))
def worker(self, queue: mp.JoinableQueue, existing_instance_ids): def worker(self, queue: mp.JoinableQueue, existing_instance_ids):
@ -121,33 +177,37 @@ class Command(BaseCommand):
db.connections.close_all() db.connections.close_all()
while True: while True:
instance = queue.get() instance = queue.get()
if instance in self.done_bag: if instance.name in self.scraped_ids:
self.stderr.write(log("Skipping {}, already done. This should not have been added to the queue!".format(instance))) self.stderr.write(log("Skipping {}, already done. This should not have been added to the queue!"
.format(instance)))
queue.task_done() queue.task_done()
else: else:
# Fetch data on instance # Fetch data on instance
self.stdout.write(log("Processing {}".format(instance.name))) self.stdout.write(log("Processing {}".format(instance.name)))
data = self.process_instance(instance) data = self.process_instance(instance)
self.save_data(instance, data, queue, existing_instance_ids) self.save_data(instance, data, queue, existing_instance_ids)
self.done_bag.add(instance) self.scraped_ids.add(instance.name)
queue.task_done() queue.task_done()
def handle(self, *args, **options): def handle(self, *args, **options):
start_time = time.time() start_time = time.time()
stale_instances = Instance.objects.filter(last_updated__lte=datetime.now()-timedelta(weeks=1)) stale_instances = Instance.objects.filter(last_updated__lte=datetime.now()-timedelta(days=1))
with mp.Manager() as manager: with mp.Manager() as manager:
# Share the list of existing instances amongst all threads (to avoid each thread having to query # Share the list of existing instances amongst all threads (to avoid each thread having to query
# for it on every instance it scrapes) # for it on every instance it scrapes)
existing_instance_ids = manager.list(list(Instance.objects.values_list('name', flat=True))) existing_instance_ids = manager.list(list(Instance.objects.values_list('name', flat=True)))
queue = mp.JoinableQueue() queue = mp.JoinableQueue()
if stale_instances: if stale_instances:
queue.put(list(stale_instances)) for instance in stale_instances:
queue.put(instance)
elif not Instance.objects.exists(): elif not Instance.objects.exists():
instance, _ = Instance.objects.get_or_create(name=SEED) instance, _ = Instance.objects.get_or_create(name=SEED)
queue.put(instance)
existing_instance_ids.append(instance.name) existing_instance_ids.append(instance.name)
queue.put(instance)
pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids)) pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids))
queue.join() queue.join()
end_time = time.time() end_time = time.time()
self.stdout.write(self.style.SUCCESS(log("Successfully scraped the fediverse in {:.0f}s".format(end_time-start_time)))) self.stdout.write(self.style.SUCCESS(log("Scraped {} instances in {:.0f}s"
.format(len(self.scraped_ids), end_time - start_time))))

View file

@ -1,4 +1,4 @@
# Generated by Django 2.1 on 2018-09-01 14:00 # Generated by Django 2.1 on 2018-09-01 22:28
from django.db import migrations, models from django.db import migrations, models
import django.db.models.deletion import django.db.models.deletion
@ -30,7 +30,10 @@ class Migration(migrations.Migration):
name='PeerRelationship', name='PeerRelationship',
fields=[ fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('mention_count', models.IntegerField(blank=True, null=True)),
('statuses_seen', models.IntegerField(blank=True, null=True)),
('first_seen', models.DateTimeField(auto_now_add=True)), ('first_seen', models.DateTimeField(auto_now_add=True)),
('last_updated', models.DateTimeField(auto_now=True)),
('source', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='following_relationship', to='scraper.Instance')), ('source', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='following_relationship', to='scraper.Instance')),
('target', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='follower_relationships', to='scraper.Instance')), ('target', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='follower_relationships', to='scraper.Instance')),
], ],

View file

@ -31,5 +31,10 @@ class PeerRelationship(models.Model):
source = models.ForeignKey(Instance, related_name="following_relationship", on_delete=models.CASCADE) source = models.ForeignKey(Instance, related_name="following_relationship", on_delete=models.CASCADE)
target = models.ForeignKey(Instance, related_name="follower_relationships", on_delete=models.CASCADE) target = models.ForeignKey(Instance, related_name="follower_relationships", on_delete=models.CASCADE)
# Interaction stats
mention_count = models.IntegerField(blank=True, null=True)
statuses_seen = models.IntegerField(blank=True, null=True) # in case we want mention_count as a ratio
# Metadata # Metadata
first_seen = models.DateTimeField(auto_now_add=True) first_seen = models.DateTimeField(auto_now_add=True)
last_updated = models.DateTimeField(auto_now=True)