index.community/scraper/management/commands/scrape.py

143 lines
6.2 KiB
Python
Raw Normal View History

2018-08-26 01:17:10 +00:00
"""
This script starts at a seed instance and loads the list of connected
2018-08-26 18:15:50 +00:00
peers. From there, it scrapes the peers of all instances it finds,
2018-08-26 01:17:10 +00:00
gradually mapping the fediverse.
"""
import json
import multiprocessing
import requests
2018-08-26 18:15:50 +00:00
import time
2018-08-26 23:27:14 +00:00
from datetime import datetime
2018-08-26 01:17:10 +00:00
from django.core.management.base import BaseCommand
2018-08-26 22:12:24 +00:00
from django.db import transaction
2018-08-26 01:17:10 +00:00
from scraper.models import Instance, InstanceStats
2018-08-26 23:27:14 +00:00
from scraper.management.commands._util import require_lock, InvalidResponseError, get_key
2018-08-26 01:17:10 +00:00
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Because the script uses the Mastodon API other platforms like #
# Pleroma, Peertube, Pixelfed, Funkwhale won't have outgoing peers. #
# #
# The script generates two files: #
# - nodes.csv #
# - edges.csv #
# #
# Change SEED to start from a different instance. #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
2018-08-26 18:15:50 +00:00
# TODO: use the /api/v1/server/followers and /api/v1/server/following endpoints in peertube instances
2018-08-28 22:22:29 +00:00
SEED = 'mastodon.social'
2018-08-26 18:15:50 +00:00
TIMEOUT = 20
2018-08-26 01:17:10 +00:00
class Command(BaseCommand):
help = "Scrapes the entire fediverse"
2018-08-26 18:15:50 +00:00
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.done_bag = set()
@staticmethod
def get_instance_info(instance_name: str):
"""Collect info about instance"""
url = 'https://' + instance_name + '/api/v1/instance'
response = requests.get(url, timeout=TIMEOUT)
if response.status_code != 200:
raise InvalidResponseError("Could not get info for {}".format(instance_name))
return response.json()
@staticmethod
def get_instance_peers(instance_name: str):
"""Collect connected instances"""
url = 'https://' + instance_name + '/api/v1/instance/peers'
response = requests.get(url, timeout=TIMEOUT)
if response.status_code != 200:
raise InvalidResponseError("Could not get peers for {}".format(instance_name))
return response.json()
def process_instance(self, instance_name: str):
"""Given an instance, get all the data we're interested in"""
2018-08-26 23:27:14 +00:00
self.stdout.write("{} - Processing {}".format(datetime.now().isoformat(), instance_name))
2018-08-26 18:15:50 +00:00
data = dict()
try:
data['instance'] = instance_name
data['info'] = self.get_instance_info(instance_name)
2018-08-26 23:27:14 +00:00
data['peers'] = [peer for peer in self.get_instance_peers(instance_name) if peer] # get rid of null peers
2018-08-26 18:15:50 +00:00
data['status'] = 'success'
return data
except (InvalidResponseError,
requests.exceptions.RequestException,
json.decoder.JSONDecodeError) as e:
data['instance'] = instance_name
data['status'] = type(e).__name__
return data
2018-08-26 22:12:24 +00:00
@transaction.atomic
@require_lock(Instance, 'ACCESS EXCLUSIVE')
2018-08-26 18:15:50 +00:00
def save_data(self, data):
"""Save data"""
2018-08-28 22:22:29 +00:00
user_count = get_key(data, ['info', 'stats', 'user_count'])
if user_count:
instance, _ = Instance.objects.update_or_create(
name=get_key(data, ['instance']),
defaults={'user_count': user_count},
)
else:
instance, _ = Instance.objects.get_or_create(name=get_key(data, ['instance']))
2018-08-26 18:15:50 +00:00
if data['status'] == 'success':
# Save stats
stats = InstanceStats(
instance=instance,
2018-08-28 22:22:29 +00:00
domain_count=get_key(data, ['info', 'stats', 'domain_count']),
status_count=get_key(data, ['info', 'stats', 'status_count']),
user_count=get_key(data, ['info', 'stats', 'user_count']),
2018-08-26 18:15:50 +00:00
version=get_key(data, ['info', 'version']),
status=get_key(data, ['status']),
)
stats.save()
# Save peers
2018-08-26 22:12:24 +00:00
# TODO: make this shared amongst threads so the database only needs to be queried once
if not data['peers']:
return
2018-08-26 22:12:24 +00:00
existing_instance_ids = Instance.objects.values_list('name', flat=True)
existing_peers = Instance.objects.filter(name__in=existing_instance_ids)
new_peer_ids = [peer for peer in data['peers'] if peer not in existing_instance_ids]
2018-08-26 18:15:50 +00:00
if new_peer_ids:
new_peers = Instance.objects.bulk_create([Instance(name=peer) for peer in new_peer_ids])
instance.peers.set(new_peers)
instance.peers.set(existing_peers)
else:
stats = InstanceStats(
instance=instance,
status=get_key(data, ['status'])
)
stats.save()
2018-08-26 23:27:14 +00:00
self.stdout.write("{} - Saved {}".format(datetime.now().isoformat(), data['instance']))
2018-08-26 18:15:50 +00:00
def worker(self, queue: multiprocessing.JoinableQueue):
"""The main worker that processes URLs"""
while True:
# Get an item from the queue. Block if the queue is empty.
instance = queue.get()
if instance in self.done_bag:
print("Skipping {}, already done".format(instance))
queue.task_done()
else:
data = self.process_instance(instance)
if 'peers' in data:
for peer in [p for p in data['peers'] if p not in self.done_bag]:
queue.put(peer)
self.save_data(data)
self.done_bag.add(instance)
queue.task_done()
2018-08-26 01:17:10 +00:00
def handle(self, *args, **options):
2018-08-26 18:15:50 +00:00
start_time = time.time()
2018-08-26 01:17:10 +00:00
queue = multiprocessing.JoinableQueue()
queue.put(SEED)
2018-08-26 18:15:50 +00:00
# pool = multiprocessing.Pool(1, initializer=self.worker, initargs=(queue, )) # Disable concurrency (debug)
pool = multiprocessing.Pool(initializer=self.worker, initargs=(queue, ))
2018-08-26 01:17:10 +00:00
queue.join()
2018-08-26 18:15:50 +00:00
end_time = time.time()
2018-08-26 23:27:14 +00:00
self.stdout.write(self.style.SUCCESS("Successfully scraped the fediverse in {:.0f}s".format(end_time-start_time)))