2018-08-26 01:17:10 +00:00
|
|
|
"""
|
|
|
|
This script starts at a seed instance and loads the list of connected
|
2018-08-26 18:15:50 +00:00
|
|
|
peers. From there, it scrapes the peers of all instances it finds,
|
2018-08-26 01:17:10 +00:00
|
|
|
gradually mapping the fediverse.
|
|
|
|
"""
|
|
|
|
import json
|
|
|
|
import multiprocessing
|
|
|
|
import requests
|
2018-08-26 18:15:50 +00:00
|
|
|
import time
|
2018-08-26 01:17:10 +00:00
|
|
|
from django.core.management.base import BaseCommand
|
2018-08-26 22:12:24 +00:00
|
|
|
from django.db import transaction
|
2018-08-26 01:17:10 +00:00
|
|
|
from scraper.models import Instance, InstanceStats
|
2018-08-26 22:12:24 +00:00
|
|
|
from scraper.management.commands._util import require_lock
|
2018-08-26 01:17:10 +00:00
|
|
|
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
# Because the script uses the Mastodon API other platforms like #
|
|
|
|
# Pleroma, Peertube, Pixelfed, Funkwhale won't have outgoing peers. #
|
|
|
|
# #
|
|
|
|
# The script generates two files: #
|
|
|
|
# - nodes.csv #
|
|
|
|
# - edges.csv #
|
|
|
|
# #
|
|
|
|
# Change SEED to start from a different instance. #
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
|
2018-08-26 18:15:50 +00:00
|
|
|
# TODO: use the /api/v1/server/followers and /api/v1/server/following endpoints in peertube instances
|
|
|
|
|
2018-08-26 01:17:10 +00:00
|
|
|
SEED = 'mastodon.social'
|
2018-08-26 18:15:50 +00:00
|
|
|
TIMEOUT = 20
|
2018-08-26 01:17:10 +00:00
|
|
|
|
|
|
|
|
|
|
|
class InvalidResponseError(Exception):
|
|
|
|
"""Used for all responses other than HTTP 200"""
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
def get_key(data, keys: list):
|
|
|
|
try:
|
|
|
|
val = data[keys.pop(0)]
|
|
|
|
while keys:
|
|
|
|
val = val[keys.pop(0)]
|
|
|
|
return val
|
|
|
|
except KeyError:
|
|
|
|
return ''
|
|
|
|
|
|
|
|
|
|
|
|
class Command(BaseCommand):
|
|
|
|
help = "Scrapes the entire fediverse"
|
|
|
|
|
2018-08-26 18:15:50 +00:00
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.done_bag = set()
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_instance_info(instance_name: str):
|
|
|
|
"""Collect info about instance"""
|
|
|
|
url = 'https://' + instance_name + '/api/v1/instance'
|
|
|
|
response = requests.get(url, timeout=TIMEOUT)
|
|
|
|
if response.status_code != 200:
|
|
|
|
raise InvalidResponseError("Could not get info for {}".format(instance_name))
|
|
|
|
return response.json()
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_instance_peers(instance_name: str):
|
|
|
|
"""Collect connected instances"""
|
|
|
|
url = 'https://' + instance_name + '/api/v1/instance/peers'
|
|
|
|
response = requests.get(url, timeout=TIMEOUT)
|
|
|
|
if response.status_code != 200:
|
|
|
|
raise InvalidResponseError("Could not get peers for {}".format(instance_name))
|
|
|
|
return response.json()
|
|
|
|
|
|
|
|
def process_instance(self, instance_name: str):
|
|
|
|
"""Given an instance, get all the data we're interested in"""
|
|
|
|
print("Processing {}".format(instance_name))
|
|
|
|
data = dict()
|
|
|
|
try:
|
|
|
|
data['instance'] = instance_name
|
|
|
|
data['info'] = self.get_instance_info(instance_name)
|
|
|
|
data['peers'] = self.get_instance_peers(instance_name)
|
|
|
|
data['status'] = 'success'
|
|
|
|
print("Processed: {}".format(instance_name))
|
|
|
|
return data
|
|
|
|
except (InvalidResponseError,
|
|
|
|
requests.exceptions.RequestException,
|
|
|
|
json.decoder.JSONDecodeError) as e:
|
|
|
|
data['instance'] = instance_name
|
|
|
|
data['status'] = type(e).__name__
|
|
|
|
print("Failed: {}".format(instance_name))
|
|
|
|
return data
|
|
|
|
|
2018-08-26 22:12:24 +00:00
|
|
|
@transaction.atomic
|
|
|
|
@require_lock(Instance, 'ACCESS EXCLUSIVE')
|
2018-08-26 18:15:50 +00:00
|
|
|
def save_data(self, data):
|
|
|
|
"""Save data"""
|
|
|
|
instance, _ = Instance.objects.get_or_create(name=get_key(data, ['instance']))
|
|
|
|
if data['status'] == 'success':
|
|
|
|
# Save stats
|
|
|
|
stats = InstanceStats(
|
|
|
|
instance=instance,
|
|
|
|
num_peers=get_key(data, ['info', 'stats', 'domain_count']),
|
|
|
|
num_statuses=get_key(data, ['info', 'stats', 'status_count']),
|
|
|
|
num_users=get_key(data, ['info', 'stats', 'user_count']),
|
|
|
|
version=get_key(data, ['info', 'version']),
|
|
|
|
status=get_key(data, ['status']),
|
|
|
|
)
|
|
|
|
stats.save()
|
|
|
|
# Save peers
|
2018-08-26 22:12:24 +00:00
|
|
|
# TODO: make this shared amongst threads so the database only needs to be queried once
|
|
|
|
existing_instance_ids = Instance.objects.values_list('name', flat=True)
|
|
|
|
existing_peers = Instance.objects.filter(name__in=existing_instance_ids)
|
|
|
|
new_peer_ids = [peer for peer in data['peers'] if peer not in existing_instance_ids]
|
2018-08-26 18:15:50 +00:00
|
|
|
if new_peer_ids:
|
|
|
|
new_peers = Instance.objects.bulk_create([Instance(name=peer) for peer in new_peer_ids])
|
|
|
|
instance.peers.set(new_peers)
|
|
|
|
instance.peers.set(existing_peers)
|
|
|
|
else:
|
|
|
|
stats = InstanceStats(
|
|
|
|
instance=instance,
|
|
|
|
status=get_key(data, ['status'])
|
|
|
|
)
|
|
|
|
stats.save()
|
|
|
|
|
|
|
|
def worker(self, queue: multiprocessing.JoinableQueue):
|
|
|
|
"""The main worker that processes URLs"""
|
|
|
|
while True:
|
|
|
|
# Get an item from the queue. Block if the queue is empty.
|
|
|
|
instance = queue.get()
|
|
|
|
if instance in self.done_bag:
|
|
|
|
print("Skipping {}, already done".format(instance))
|
|
|
|
queue.task_done()
|
|
|
|
else:
|
|
|
|
data = self.process_instance(instance)
|
|
|
|
if 'peers' in data:
|
|
|
|
for peer in [p for p in data['peers'] if p not in self.done_bag]:
|
|
|
|
queue.put(peer)
|
|
|
|
self.save_data(data)
|
|
|
|
self.done_bag.add(instance)
|
|
|
|
queue.task_done()
|
2018-08-26 01:17:10 +00:00
|
|
|
|
|
|
|
def handle(self, *args, **options):
|
2018-08-26 18:15:50 +00:00
|
|
|
start_time = time.time()
|
2018-08-26 01:17:10 +00:00
|
|
|
queue = multiprocessing.JoinableQueue()
|
|
|
|
queue.put(SEED)
|
2018-08-26 18:15:50 +00:00
|
|
|
# pool = multiprocessing.Pool(1, initializer=self.worker, initargs=(queue, )) # Disable concurrency (debug)
|
|
|
|
pool = multiprocessing.Pool(initializer=self.worker, initargs=(queue, ))
|
2018-08-26 01:17:10 +00:00
|
|
|
queue.join()
|
2018-08-26 18:15:50 +00:00
|
|
|
end_time = time.time()
|
|
|
|
self.stdout.write(self.style.SUCCESS("Successfully scraped the fediverse in {}s".format(end_time-start_time)))
|