don't scrape personal instances
This commit is contained in:
parent
a176e35ec2
commit
62ed809ee0
|
@ -42,11 +42,19 @@ def require_lock(model, lock):
|
|||
return require_lock_decorator
|
||||
|
||||
|
||||
class InvalidResponseError(Exception):
|
||||
class InvalidResponseException(Exception):
|
||||
"""Used for all responses other than HTTP 200"""
|
||||
pass
|
||||
|
||||
|
||||
class PersonalInstanceException(Exception):
|
||||
"""
|
||||
Used for instances that we don't want to scrape because there are too few users.
|
||||
We don't want information on individuals, but aggregate statistics on instances and how they interact.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def get_key(data, keys: list):
|
||||
try:
|
||||
val = data[keys.pop(0)]
|
||||
|
|
|
@ -14,7 +14,7 @@ from django_bulk_update.helper import bulk_update
|
|||
from django.core.management.base import BaseCommand
|
||||
from django import db
|
||||
from scraper.models import Instance, PeerRelationship
|
||||
from scraper.management.commands._util import require_lock, InvalidResponseError, get_key, log, validate_int
|
||||
from scraper.management.commands._util import require_lock, InvalidResponseException, get_key, log, validate_int, PersonalInstanceException
|
||||
|
||||
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||||
# Because the script uses the Mastodon API other platforms like #
|
||||
|
@ -30,8 +30,9 @@ from scraper.management.commands._util import require_lock, InvalidResponseError
|
|||
# TODO: use the /api/v1/server/followers and /api/v1/server/following endpoints in peertube instances
|
||||
|
||||
SEED = 'mastodon.social'
|
||||
TIMEOUT = 10
|
||||
TIMEOUT = 10 # seconds
|
||||
NUM_THREADS = 64
|
||||
PERSONAL_INSTANCE_THRESHOLD = 5 # instances with <= this many users won't be scraped
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
|
@ -39,7 +40,7 @@ class Command(BaseCommand):
|
|||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.scraped_ids = set()
|
||||
self.scraped_count = 0
|
||||
|
||||
@staticmethod
|
||||
def get_instance_info(instance_name: str):
|
||||
|
@ -48,7 +49,7 @@ class Command(BaseCommand):
|
|||
response = requests.get(url, timeout=TIMEOUT)
|
||||
json = response.json()
|
||||
if response.status_code != 200 or get_key(json, ['error']):
|
||||
raise InvalidResponseError("Could not get info for {}".format(instance_name))
|
||||
raise InvalidResponseException("Could not get info for {}".format(instance_name))
|
||||
return json
|
||||
|
||||
@staticmethod
|
||||
|
@ -60,7 +61,7 @@ class Command(BaseCommand):
|
|||
response = requests.get(url, timeout=TIMEOUT)
|
||||
peers = response.json()
|
||||
if response.status_code != 200 or not isinstance(peers, list) or get_key(peers, ['error']):
|
||||
raise InvalidResponseError("Could not get peers for {}".format(instance_name))
|
||||
raise InvalidResponseException("Could not get peers for {}".format(instance_name))
|
||||
# Get rid of peers that just say "null" and the instance itself
|
||||
return [peer for peer in peers if peer and peer != instance_name]
|
||||
|
||||
|
@ -77,7 +78,7 @@ class Command(BaseCommand):
|
|||
response = requests.get(url, timeout=TIMEOUT)
|
||||
statuses = response.json()
|
||||
if response.status_code != 200 or get_key(statuses, ['error']):
|
||||
raise InvalidResponseError("Could not get statuses for {}".format(instance_name))
|
||||
raise InvalidResponseException("Could not get statuses for {}".format(instance_name))
|
||||
elif len(statuses) == 0:
|
||||
break
|
||||
# Get mentions from this instance
|
||||
|
@ -111,14 +112,23 @@ class Command(BaseCommand):
|
|||
try:
|
||||
data['instance_name'] = instance.name
|
||||
data['info'] = self.get_instance_info(instance.name)
|
||||
|
||||
# Check if this is a personal instance before continuing
|
||||
user_count = get_key(data, ['info', 'stats', 'user_count'])
|
||||
if isinstance(user_count, int) and user_count < PERSONAL_INSTANCE_THRESHOLD:
|
||||
raise PersonalInstanceException
|
||||
|
||||
data['peers'] = self.get_instance_peers(instance.name)
|
||||
if not data['info'] and not data['peers']:
|
||||
# We got a response from the instance, but it didn't have any of the information we were expecting.
|
||||
raise InvalidResponseError
|
||||
raise InvalidResponseException
|
||||
|
||||
data['mentions'], data['statuses_seen'] = self.get_statuses(instance.name)
|
||||
data['status'] = 'success'
|
||||
return data
|
||||
except (InvalidResponseError,
|
||||
|
||||
except (InvalidResponseException,
|
||||
PersonalInstanceException,
|
||||
requests.exceptions.RequestException,
|
||||
json.decoder.JSONDecodeError) as e:
|
||||
data['instance_name'] = instance.name
|
||||
|
@ -171,13 +181,13 @@ class Command(BaseCommand):
|
|||
|
||||
self.stdout.write(log("Saved {}".format(data['instance_name'])))
|
||||
|
||||
def worker(self, queue: mp.JoinableQueue, existing_instance_ids):
|
||||
def worker(self, queue: mp.JoinableQueue, existing_instance_ids, scraped_ids):
|
||||
"""The main worker that processes URLs"""
|
||||
# https://stackoverflow.com/a/38356519/3697202
|
||||
db.connections.close_all()
|
||||
while True:
|
||||
instance = queue.get()
|
||||
if instance.name in self.scraped_ids:
|
||||
if instance.name in scraped_ids:
|
||||
self.stderr.write(log("Skipping {}, already done. This should not have been added to the queue!"
|
||||
.format(instance)))
|
||||
queue.task_done()
|
||||
|
@ -186,7 +196,7 @@ class Command(BaseCommand):
|
|||
self.stdout.write(log("Processing {}".format(instance.name)))
|
||||
data = self.process_instance(instance)
|
||||
self.save_data(instance, data, queue, existing_instance_ids)
|
||||
self.scraped_ids.add(instance.name)
|
||||
scraped_ids[instance.name] = 1
|
||||
queue.task_done()
|
||||
|
||||
def handle(self, *args, **options):
|
||||
|
@ -196,6 +206,7 @@ class Command(BaseCommand):
|
|||
# Share the list of existing instances amongst all threads (to avoid each thread having to query
|
||||
# for it on every instance it scrapes)
|
||||
existing_instance_ids = manager.list(list(Instance.objects.values_list('name', flat=True)))
|
||||
scraped_ids = manager.dict()
|
||||
queue = mp.JoinableQueue()
|
||||
if stale_instances:
|
||||
for instance in stale_instances:
|
||||
|
@ -205,9 +216,10 @@ class Command(BaseCommand):
|
|||
existing_instance_ids.append(instance.name)
|
||||
queue.put(instance)
|
||||
|
||||
pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids))
|
||||
pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids, scraped_ids))
|
||||
queue.join()
|
||||
self.scraped_count = len(scraped_ids.keys())
|
||||
|
||||
end_time = time.time()
|
||||
self.stdout.write(self.style.SUCCESS(log("Scraped {} instances in {:.0f}s"
|
||||
.format(len(self.scraped_ids), end_time - start_time))))
|
||||
.format(self.scraped_count, end_time - start_time))))
|
||||
|
|
Loading…
Reference in a new issue