From 62ed809ee0cfcc9bbde41ed266df02d19492de6a Mon Sep 17 00:00:00 2001 From: Tao Bojlen Date: Mon, 3 Sep 2018 00:36:03 +0200 Subject: [PATCH] don't scrape personal instances --- scraper/management/commands/_util.py | 10 ++++++- scraper/management/commands/scrape.py | 38 ++++++++++++++++++--------- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/scraper/management/commands/_util.py b/scraper/management/commands/_util.py index 80c1871..2ec317f 100644 --- a/scraper/management/commands/_util.py +++ b/scraper/management/commands/_util.py @@ -42,11 +42,19 @@ def require_lock(model, lock): return require_lock_decorator -class InvalidResponseError(Exception): +class InvalidResponseException(Exception): """Used for all responses other than HTTP 200""" pass +class PersonalInstanceException(Exception): + """ + Used for instances that we don't want to scrape because there are too few users. + We don't want information on individuals, but aggregate statistics on instances and how they interact. + """ + pass + + def get_key(data, keys: list): try: val = data[keys.pop(0)] diff --git a/scraper/management/commands/scrape.py b/scraper/management/commands/scrape.py index 9c07ff7..6aa4ef1 100644 --- a/scraper/management/commands/scrape.py +++ b/scraper/management/commands/scrape.py @@ -14,7 +14,7 @@ from django_bulk_update.helper import bulk_update from django.core.management.base import BaseCommand from django import db from scraper.models import Instance, PeerRelationship -from scraper.management.commands._util import require_lock, InvalidResponseError, get_key, log, validate_int +from scraper.management.commands._util import require_lock, InvalidResponseException, get_key, log, validate_int, PersonalInstanceException # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Because the script uses the Mastodon API other platforms like # @@ -30,8 +30,9 @@ from scraper.management.commands._util import require_lock, InvalidResponseError # TODO: use the /api/v1/server/followers and /api/v1/server/following endpoints in peertube instances SEED = 'mastodon.social' -TIMEOUT = 10 +TIMEOUT = 10 # seconds NUM_THREADS = 64 +PERSONAL_INSTANCE_THRESHOLD = 5 # instances with <= this many users won't be scraped class Command(BaseCommand): @@ -39,7 +40,7 @@ class Command(BaseCommand): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.scraped_ids = set() + self.scraped_count = 0 @staticmethod def get_instance_info(instance_name: str): @@ -48,7 +49,7 @@ class Command(BaseCommand): response = requests.get(url, timeout=TIMEOUT) json = response.json() if response.status_code != 200 or get_key(json, ['error']): - raise InvalidResponseError("Could not get info for {}".format(instance_name)) + raise InvalidResponseException("Could not get info for {}".format(instance_name)) return json @staticmethod @@ -60,7 +61,7 @@ class Command(BaseCommand): response = requests.get(url, timeout=TIMEOUT) peers = response.json() if response.status_code != 200 or not isinstance(peers, list) or get_key(peers, ['error']): - raise InvalidResponseError("Could not get peers for {}".format(instance_name)) + raise InvalidResponseException("Could not get peers for {}".format(instance_name)) # Get rid of peers that just say "null" and the instance itself return [peer for peer in peers if peer and peer != instance_name] @@ -77,7 +78,7 @@ class Command(BaseCommand): response = requests.get(url, timeout=TIMEOUT) statuses = response.json() if response.status_code != 200 or get_key(statuses, ['error']): - raise InvalidResponseError("Could not get statuses for {}".format(instance_name)) + raise InvalidResponseException("Could not get statuses for {}".format(instance_name)) elif len(statuses) == 0: break # Get mentions from this instance @@ -111,14 +112,23 @@ class Command(BaseCommand): try: data['instance_name'] = instance.name data['info'] = self.get_instance_info(instance.name) + + # Check if this is a personal instance before continuing + user_count = get_key(data, ['info', 'stats', 'user_count']) + if isinstance(user_count, int) and user_count < PERSONAL_INSTANCE_THRESHOLD: + raise PersonalInstanceException + data['peers'] = self.get_instance_peers(instance.name) if not data['info'] and not data['peers']: # We got a response from the instance, but it didn't have any of the information we were expecting. - raise InvalidResponseError + raise InvalidResponseException + data['mentions'], data['statuses_seen'] = self.get_statuses(instance.name) data['status'] = 'success' return data - except (InvalidResponseError, + + except (InvalidResponseException, + PersonalInstanceException, requests.exceptions.RequestException, json.decoder.JSONDecodeError) as e: data['instance_name'] = instance.name @@ -171,13 +181,13 @@ class Command(BaseCommand): self.stdout.write(log("Saved {}".format(data['instance_name']))) - def worker(self, queue: mp.JoinableQueue, existing_instance_ids): + def worker(self, queue: mp.JoinableQueue, existing_instance_ids, scraped_ids): """The main worker that processes URLs""" # https://stackoverflow.com/a/38356519/3697202 db.connections.close_all() while True: instance = queue.get() - if instance.name in self.scraped_ids: + if instance.name in scraped_ids: self.stderr.write(log("Skipping {}, already done. This should not have been added to the queue!" .format(instance))) queue.task_done() @@ -186,7 +196,7 @@ class Command(BaseCommand): self.stdout.write(log("Processing {}".format(instance.name))) data = self.process_instance(instance) self.save_data(instance, data, queue, existing_instance_ids) - self.scraped_ids.add(instance.name) + scraped_ids[instance.name] = 1 queue.task_done() def handle(self, *args, **options): @@ -196,6 +206,7 @@ class Command(BaseCommand): # Share the list of existing instances amongst all threads (to avoid each thread having to query # for it on every instance it scrapes) existing_instance_ids = manager.list(list(Instance.objects.values_list('name', flat=True))) + scraped_ids = manager.dict() queue = mp.JoinableQueue() if stale_instances: for instance in stale_instances: @@ -205,9 +216,10 @@ class Command(BaseCommand): existing_instance_ids.append(instance.name) queue.put(instance) - pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids)) + pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids, scraped_ids)) queue.join() + self.scraped_count = len(scraped_ids.keys()) end_time = time.time() self.stdout.write(self.style.SUCCESS(log("Scraped {} instances in {:.0f}s" - .format(len(self.scraped_ids), end_time - start_time)))) + .format(self.scraped_count, end_time - start_time))))