Merge branch 'develop' into 'production'

Clean up crawler

See merge request taobojlen/fediverse.space!46
This commit is contained in:
Tao Bojlén 2019-02-28 18:09:26 +00:00
commit 9ad535c90c
2 changed files with 35 additions and 7 deletions

View file

@ -32,5 +32,8 @@ class NodeView(viewsets.ReadOnlyModelViewSet):
""" """
Endpoint to get a list of the graph's nodes in a SigmaJS-friendly format. Endpoint to get a list of the graph's nodes in a SigmaJS-friendly format.
""" """
queryset = Instance.objects.filter(status='success') queryset = Instance.objects.filter(status='success')\
.filter(x_coord__isnull=False)\
.filter(y_coord__isnull=False)\
.filter(user_count__isnull=False)
serializer_class = NodeSerializer serializer_class = NodeSerializer

View file

@ -24,13 +24,30 @@ from scraper.management.commands._util import require_lock, InvalidResponseExcep
SEED = 'mastodon.social' SEED = 'mastodon.social'
TIMEOUT = 20 # seconds TIMEOUT = 20 # seconds
NUM_THREADS = 16 # roughly 40MB each NUM_THREADS = 16 # roughly 40MB each
PERSONAL_INSTANCE_THRESHOLD = 5 # instances with < this many users won't be scraped PERSONAL_INSTANCE_THRESHOLD = 10 # instances with < this many users won't be crawled
MAX_STATUSES_PER_PAGE = 100
STATUS_SCRAPE_LIMIT = 5000 STATUS_SCRAPE_LIMIT = 5000
INSTANCE_SCRAPE_LIMIT = 50 # note: this does not include newly discovered instances! they will always be crawled.
class Command(BaseCommand): class Command(BaseCommand):
help = "Scrapes the entire fediverse" help = "Scrapes the entire fediverse"
def add_arguments(self, parser):
# Named (optional) arguments
parser.add_argument(
'--unlimited',
action='store_true',
dest='unlimited',
help="Crawl all stale instances rather than limiting to {}".format(INSTANCE_SCRAPE_LIMIT),
)
parser.add_argument(
'--all',
action='store_true',
dest='all',
help="Crawl all instances rather than limiting to stale ones"
)
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.scraped_count = 0 self.scraped_count = 0
@ -59,7 +76,8 @@ class Command(BaseCommand):
if response.status_code != 200 or not isinstance(peers, list) or get_key(peers, ['error']): if response.status_code != 200 or not isinstance(peers, list) or get_key(peers, ['error']):
raise InvalidResponseException("Could not get peers for {}".format(instance_name)) raise InvalidResponseException("Could not get peers for {}".format(instance_name))
# Get rid of peers that just say "null" and the instance itself # Get rid of peers that just say "null" and the instance itself
return [peer for peer in peers if peer and peer != instance_name] # Also make sure to lowercase all instance names; otherwise there'll be some duplicates
return [peer.lower() for peer in peers if peer and peer != instance_name]
@staticmethod @staticmethod
def get_statuses(instance_name: str): def get_statuses(instance_name: str):
@ -67,9 +85,9 @@ class Command(BaseCommand):
mentions = [] mentions = []
datetime_threshold = datetime.now(timezone.utc) - timedelta(days=31) datetime_threshold = datetime.now(timezone.utc) - timedelta(days=31)
statuses_seen = 0 statuses_seen = 0
# We'll ask for 1000 statuses, but Mastodon never returns more than 40. Some Pleroma instances will ignore # We'll ask for lots of statuses, but Mastodon never returns more than 40. Some Pleroma instances will ignore
# the limit and return 20. # the limit and return 20.
url = 'https://' + instance_name + '/api/v1/timelines/public?local=true&limit=1000' url = 'https://{}/api/v1/timelines/public?local=true&limit={}/'.format(instance_name, MAX_STATUSES_PER_PAGE)
while True: while True:
response = requests.get(url, timeout=TIMEOUT) response = requests.get(url, timeout=TIMEOUT)
statuses = response.json() statuses = response.json()
@ -91,7 +109,7 @@ class Command(BaseCommand):
break break
# Continuing, so get url for next page # Continuing, so get url for next page
min_id = earliest_status['id'] min_id = earliest_status['id']
url = 'https://' + instance_name + '/api/v1/timelines/public?local=true&limit=1000&max_id=' + min_id url = 'https://{}/api/v1/timelines/public?local=true&limit={}&max_id={}'.format(instance_name, MAX_STATUSES_PER_PAGE, min_id)
time.sleep(2) # Sleep to avoid overloading the instance time.sleep(2) # Sleep to avoid overloading the instance
mentions_seq = (seq(mentions) mentions_seq = (seq(mentions)
@ -201,7 +219,14 @@ class Command(BaseCommand):
def handle(self, *args, **options): def handle(self, *args, **options):
start_time = time.time() start_time = time.time()
stale_instances = Instance.objects.filter(last_updated__lte=datetime.now()-timedelta(days=1)) if options['all']:
stale_instances = Instance.objects.all()
else:
stale_instances = Instance.objects.filter(last_updated__lte=datetime.now()-timedelta(days=1))
if not options['unlimited']:
stale_instances = stale_instances[:INSTANCE_SCRAPE_LIMIT]
with mp.Manager() as manager: with mp.Manager() as manager:
# Share the list of existing instances amongst all threads (to avoid each thread having to query # Share the list of existing instances amongst all threads (to avoid each thread having to query
# for it on every instance it scrapes) # for it on every instance it scrapes)