From 074e2e1b885178c98330700814ee6317590dd0cf Mon Sep 17 00:00:00 2001 From: Tao Bojlen Date: Sun, 26 Aug 2018 20:15:50 +0200 Subject: [PATCH] fix scraper --- scraper/management/commands/scrape.py | 195 ++++++++++-------- scraper/migrations/0001_initial.py | 15 +- scraper/migrations/0002_auto_20180826_0053.py | 42 ---- scraper/migrations/0003_auto_20180826_0057.py | 28 --- scraper/migrations/0004_auto_20180826_0100.py | 18 -- 5 files changed, 112 insertions(+), 186 deletions(-) delete mode 100644 scraper/migrations/0002_auto_20180826_0053.py delete mode 100644 scraper/migrations/0003_auto_20180826_0057.py delete mode 100644 scraper/migrations/0004_auto_20180826_0100.py diff --git a/scraper/management/commands/scrape.py b/scraper/management/commands/scrape.py index 8d87b1b..6146481 100644 --- a/scraper/management/commands/scrape.py +++ b/scraper/management/commands/scrape.py @@ -1,11 +1,12 @@ """ This script starts at a seed instance and loads the list of connected -peers. From there, it slowly scrapes the peers of all instances it finds, +peers. From there, it scrapes the peers of all instances it finds, gradually mapping the fediverse. """ import json import multiprocessing import requests +import time from django.core.management.base import BaseCommand from scraper.models import Instance, InstanceStats @@ -20,9 +21,10 @@ from scraper.models import Instance, InstanceStats # Change SEED to start from a different instance. # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +# TODO: use the /api/v1/server/followers and /api/v1/server/following endpoints in peertube instances + SEED = 'mastodon.social' -THREADS = 100 -TIMEOUT = 10 +TIMEOUT = 20 class InvalidResponseError(Exception): @@ -40,98 +42,111 @@ def get_key(data, keys: list): return '' -def get_instance_info(instance_name: str): - """Collect info about instance""" - url = 'https://' + instance_name + '/api/v1/instance' - response = requests.get(url, timeout=TIMEOUT) - if response.status_code != 200: - raise InvalidResponseError("Could not get info for {}".format(instance_name)) - return response.json() - - -def get_instance_peers(instance_name: str): - """Collect connected instances""" - url = 'https://' + instance_name + '/api/v1/instance/peers' - response = requests.get(url, timeout=TIMEOUT) - if response.status_code != 200: - raise InvalidResponseError("Could not get peers for {}".format(instance_name)) - return response.json() - - -def process_instance(instance_name: str): - """Given an instance, get all the data we're interested in""" - print("Processing {}".format(instance_name)) - data = dict() - try: - data['instance'] = instance_name - data['info'] = get_instance_info(instance_name) - data['peers'] = get_instance_peers(instance_name) - data['status'] = 'success' - print("Processed: {}".format(instance_name)) - return data - except (InvalidResponseError, - requests.exceptions.RequestException, - json.decoder.JSONDecodeError) as e: - data['instance'] = instance_name - data['status'] = type(e).__name__ - print("Failed: {}".format(instance_name)) - return data - - -def save_data(data): - """Save data""" - instance, _ = Instance.objects.get_or_create(name=get_key(data, ['instance'])) - if data['status'] == 'success': - # Save stats - stats = InstanceStats( - instance=instance, - num_peers=get_key(data, ['info', 'stats', 'domain_count']), - num_statuses=get_key(data, ['info', 'stats', 'status_count']), - num_users=get_key(data, ['info', 'stats', 'user_count']), - version=get_key(data, ['info', 'version']), - status=get_key(data, ['status']), - ) - stats.save() - # Save peers - # TODO: optimization opportunity here if we do this in bulk - # Make sure to consider race conditions - # https://stackoverflow.com/q/24502658/3697202 - peers = [Instance.objects.get_or_create(name=n) for n in data['peers']] - instance.peers.add(*[peers]) - else: - stats = InstanceStats( - instance=instance, - status=get_key(data, ['status']) - ) - stats.save() - - -def worker(queue: multiprocessing.JoinableQueue, done_bag: set): - """The main worker that processes URLs""" - while True: - # Get an item from the queue. Block if the queue is empty. - instance = queue.get() - if instance in done_bag: - print("Skipping {}, already done".format(instance)) - queue.task_done() - else: - data = process_instance(instance) - if 'peers' in data: - for peer in [p for p in data['peers'] if p not in done_bag]: - queue.put(peer) - save_data(data) - done_bag.add(instance) - queue.task_done() - - class Command(BaseCommand): help = "Scrapes the entire fediverse" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.done_bag = set() + self.existing_instance_ids = [] + + @staticmethod + def get_instance_info(instance_name: str): + """Collect info about instance""" + url = 'https://' + instance_name + '/api/v1/instance' + response = requests.get(url, timeout=TIMEOUT) + if response.status_code != 200: + raise InvalidResponseError("Could not get info for {}".format(instance_name)) + return response.json() + + @staticmethod + def get_instance_peers(instance_name: str): + """Collect connected instances""" + url = 'https://' + instance_name + '/api/v1/instance/peers' + response = requests.get(url, timeout=TIMEOUT) + if response.status_code != 200: + raise InvalidResponseError("Could not get peers for {}".format(instance_name)) + return response.json() + + def process_instance(self, instance_name: str): + """Given an instance, get all the data we're interested in""" + print("Processing {}".format(instance_name)) + data = dict() + try: + data['instance'] = instance_name + data['info'] = self.get_instance_info(instance_name) + data['peers'] = self.get_instance_peers(instance_name) + data['status'] = 'success' + print("Processed: {}".format(instance_name)) + return data + except (InvalidResponseError, + requests.exceptions.RequestException, + json.decoder.JSONDecodeError) as e: + data['instance'] = instance_name + data['status'] = type(e).__name__ + print("Failed: {}".format(instance_name)) + return data + + def save_data(self, data): + """Save data""" + instance, _ = Instance.objects.get_or_create(name=get_key(data, ['instance'])) + if data['status'] == 'success': + # Save stats + stats = InstanceStats( + instance=instance, + num_peers=get_key(data, ['info', 'stats', 'domain_count']), + num_statuses=get_key(data, ['info', 'stats', 'status_count']), + num_users=get_key(data, ['info', 'stats', 'user_count']), + version=get_key(data, ['info', 'version']), + status=get_key(data, ['status']), + ) + stats.save() + # Save peers + # Save the list of instances we already have in the database + existing_peers = Instance.objects.filter(name__in=self.existing_instance_ids) + print("setting new_peer_ids") + new_peer_ids = [peer for peer in data['peers'] if peer not in self.existing_instance_ids] + if new_peer_ids: + print("setting new_peers (ids: {})".format(new_peer_ids)) + new_peers = Instance.objects.bulk_create([Instance(name=peer) for peer in new_peer_ids]) + print("adding to existing_instance_ids") + self.existing_instance_ids.extend(new_peer_ids) + print("adding new peers") + instance.peers.set(new_peers) + print("adding existing peers") + instance.peers.set(existing_peers) + else: + stats = InstanceStats( + instance=instance, + status=get_key(data, ['status']) + ) + stats.save() + + def worker(self, queue: multiprocessing.JoinableQueue): + """The main worker that processes URLs""" + while True: + # Get an item from the queue. Block if the queue is empty. + instance = queue.get() + if instance in self.done_bag: + print("Skipping {}, already done".format(instance)) + queue.task_done() + else: + data = self.process_instance(instance) + if 'peers' in data: + for peer in [p for p in data['peers'] if p not in self.done_bag]: + queue.put(peer) + self.save_data(data) + self.done_bag.add(instance) + queue.task_done() def handle(self, *args, **options): - done_bag = set() + start_time = time.time() + self.existing_instance_ids = Instance.objects.all().values_list('name', flat=True) + print("Existing instances: {}".format(self.existing_instance_ids)) queue = multiprocessing.JoinableQueue() queue.put(SEED) - pool = multiprocessing.Pool(THREADS, initializer=worker, initargs=(queue, done_bag)) + # pool = multiprocessing.Pool(1, initializer=self.worker, initargs=(queue, )) # Disable concurrency (debug) + pool = multiprocessing.Pool(initializer=self.worker, initargs=(queue, )) queue.join() - self.stdout.write(self.style.SUCCESS("Successfully scraped the fediverse")) + end_time = time.time() + self.stdout.write(self.style.SUCCESS("Successfully scraped the fediverse in {}s".format(end_time-start_time))) diff --git a/scraper/migrations/0001_initial.py b/scraper/migrations/0001_initial.py index 626f962..ced10ac 100644 --- a/scraper/migrations/0001_initial.py +++ b/scraper/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 2.1 on 2018-08-26 00:29 +# Generated by Django 2.1 on 2018-08-26 17:26 from django.db import migrations, models import django.db.models.deletion @@ -15,9 +15,8 @@ class Migration(migrations.Migration): migrations.CreateModel( name='Instance', fields=[ - ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('name', models.CharField(max_length=200)), - ('peers', models.ManyToManyField(related_name='followers', to='scraper.Instance')), + ('name', models.CharField(max_length=200, primary_key=True, serialize=False)), + ('peers', models.ManyToManyField(to='scraper.Instance')), ], ), migrations.CreateModel( @@ -25,10 +24,10 @@ class Migration(migrations.Migration): fields=[ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('timestamp', models.DateTimeField(auto_now_add=True)), - ('num_peers', models.IntegerField()), - ('num_statuses', models.IntegerField()), - ('num_users', models.IntegerField()), - ('version', models.CharField(max_length=1000)), + ('num_peers', models.IntegerField(blank=True, null=True)), + ('num_statuses', models.IntegerField(blank=True, null=True)), + ('num_users', models.IntegerField(blank=True, null=True)), + ('version', models.CharField(blank=True, max_length=1000)), ('status', models.CharField(max_length=100)), ('instance', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='scraper.Instance')), ], diff --git a/scraper/migrations/0002_auto_20180826_0053.py b/scraper/migrations/0002_auto_20180826_0053.py deleted file mode 100644 index 7fe5f9b..0000000 --- a/scraper/migrations/0002_auto_20180826_0053.py +++ /dev/null @@ -1,42 +0,0 @@ -# Generated by Django 2.1 on 2018-08-26 00:53 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('scraper', '0001_initial'), - ] - - operations = [ - migrations.RemoveField( - model_name='instance', - name='id', - ), - migrations.AlterField( - model_name='instance', - name='name', - field=models.CharField(max_length=200, primary_key=True, serialize=False), - ), - migrations.AlterField( - model_name='instancestats', - name='num_peers', - field=models.IntegerField(blank=True), - ), - migrations.AlterField( - model_name='instancestats', - name='num_statuses', - field=models.IntegerField(blank=True), - ), - migrations.AlterField( - model_name='instancestats', - name='num_users', - field=models.IntegerField(blank=True), - ), - migrations.AlterField( - model_name='instancestats', - name='version', - field=models.CharField(blank=True, max_length=1000), - ), - ] diff --git a/scraper/migrations/0003_auto_20180826_0057.py b/scraper/migrations/0003_auto_20180826_0057.py deleted file mode 100644 index 0794ad7..0000000 --- a/scraper/migrations/0003_auto_20180826_0057.py +++ /dev/null @@ -1,28 +0,0 @@ -# Generated by Django 2.1 on 2018-08-26 00:57 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('scraper', '0002_auto_20180826_0053'), - ] - - operations = [ - migrations.AlterField( - model_name='instancestats', - name='num_peers', - field=models.IntegerField(blank=True, null=True), - ), - migrations.AlterField( - model_name='instancestats', - name='num_statuses', - field=models.IntegerField(blank=True, null=True), - ), - migrations.AlterField( - model_name='instancestats', - name='num_users', - field=models.IntegerField(blank=True, null=True), - ), - ] diff --git a/scraper/migrations/0004_auto_20180826_0100.py b/scraper/migrations/0004_auto_20180826_0100.py deleted file mode 100644 index f3bbf41..0000000 --- a/scraper/migrations/0004_auto_20180826_0100.py +++ /dev/null @@ -1,18 +0,0 @@ -# Generated by Django 2.1 on 2018-08-26 01:00 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('scraper', '0003_auto_20180826_0057'), - ] - - operations = [ - migrations.AlterField( - model_name='instance', - name='peers', - field=models.ManyToManyField(to='scraper.Instance'), - ), - ]