scraper perf improvements

This commit is contained in:
Tao Bojlen 2018-09-01 20:46:00 +02:00
parent 555783bad3
commit d383bca0d8
3 changed files with 26 additions and 21 deletions

View file

@ -1,4 +1,5 @@
from rest_framework import viewsets
from django.db.models import Prefetch
from scraper.models import Instance, PeerRelationship
from apiv1.serializers import InstanceListSerializer, InstanceDetailSerializer, NodeSerializer, EdgeSerializer
@ -32,6 +33,5 @@ class NodeView(viewsets.ReadOnlyModelViewSet):
"""
Endpoint to get a list of the graph's nodes in a SigmaJS-friendly format.
"""
# queryset = Instance.objects.filter(status='success')
queryset = Instance.objects.all()
queryset = Instance.objects.filter(status='success')
serializer_class = NodeSerializer

View file

@ -29,7 +29,7 @@ class OptionalTrailingSlashRouter(routers.DefaultRouter):
router = OptionalTrailingSlashRouter()
router.register(r'instances', views.InstanceViewSet)
router.register(r'graph/nodes', views.NodeView)
router.register(r'graph/edges', views.EdgeView)
router.register(r'graph/edges', views.EdgeView, base_name='edge')
urlpatterns = [
path('api/v1/', include(router.urls)),

View file

@ -4,7 +4,7 @@ peers. From there, it scrapes the peers of all instances it finds,
gradually mapping the fediverse.
"""
import json
import multiprocessing
import multiprocessing as mp
import requests
import time
from datetime import datetime, timedelta
@ -81,10 +81,9 @@ class Command(BaseCommand):
@db.transaction.atomic
@require_lock(Instance, 'ACCESS EXCLUSIVE')
def save_data(self, instance, data, queue):
def save_data(self, instance, data, queue, existing_instance_ids):
"""Save data"""
# Validate the ints. Some servers that appear to be fake instances have e.g. negative numbers here.
# TODO: these always return 1!
instance.domain_count = validate_int(get_key(data, ['info', 'stats', 'domain_count']))
instance.status_count = validate_int(get_key(data, ['info', 'stats', 'status_count']))
instance.user_count = validate_int(get_key(data, ['info', 'stats', 'user_count']))
@ -95,12 +94,13 @@ class Command(BaseCommand):
if data['status'] == 'success' and data['peers']:
# TODO: handle a peer disappeer-ing
# Create instances for the peers we haven't seen before and add them to the queue
# TODO: share this among all threads so we only have to call it once at the start
existing_instance_ids = Instance.objects.values_list('name', flat=True)
new_instance_ids = [peer_id for peer_id in data['peers'] if peer_id not in existing_instance_ids]
# bulk_create doesn't call save(), so the auto_now_add field won't get set automatically
new_instances = [Instance(name=id, first_seen=datetime.now(), last_updated=datetime.now())
for id in new_instance_ids]
print("Before: {}".format(len(existing_instance_ids)))
existing_instance_ids.extend(new_instance_ids)
print("After: {}".format(len(existing_instance_ids)))
Instance.objects.bulk_create(new_instances)
for new_instance in new_instances:
queue.put(new_instance)
@ -109,13 +109,13 @@ class Command(BaseCommand):
existing_peer_ids = PeerRelationship.objects.filter(source=instance).values_list('target', flat=True)
new_peer_ids = [peer_id for peer_id in data['peers'] if peer_id not in existing_peer_ids]
if new_peer_ids:
new_peers = Instance.objects.filter(name__in=new_peer_ids)
new_relationships = [PeerRelationship(source=instance, target=new_peer, first_seen=datetime.now())
for new_peer in new_peers]
# new_peers = Instance.objects.filter(name__in=new_peer_ids)
new_relationships = [PeerRelationship(source=instance, target_id=new_peer, first_seen=datetime.now())
for new_peer in new_peer_ids]
PeerRelationship.objects.bulk_create(new_relationships)
self.stdout.write(log("Saved {}".format(data['instance_name'])))
def worker(self, queue: multiprocessing.JoinableQueue):
def worker(self, queue: mp.JoinableQueue, existing_instance_ids):
"""The main worker that processes URLs"""
# https://stackoverflow.com/a/38356519/3697202
db.connections.close_all()
@ -128,21 +128,26 @@ class Command(BaseCommand):
# Fetch data on instance
self.stdout.write(log("Processing {}".format(instance.name)))
data = self.process_instance(instance)
self.save_data(instance, data, queue)
self.save_data(instance, data, queue, existing_instance_ids)
self.done_bag.add(instance)
queue.task_done()
def handle(self, *args, **options):
start_time = time.time()
stale_instances = Instance.objects.filter(last_updated__lte=datetime.now()-timedelta(weeks=1))
queue = multiprocessing.JoinableQueue()
if stale_instances:
queue.put(list(stale_instances))
elif not Instance.objects.exists():
instance, _ = Instance.objects.get_or_create(name=SEED)
queue.put(instance)
with mp.Manager() as manager:
# Share the list of existing instances amongst all threads (to avoid each thread having to query
# for it on every instance it scrapes)
existing_instance_ids = manager.list(list(Instance.objects.values_list('name', flat=True)))
queue = mp.JoinableQueue()
if stale_instances:
queue.put(list(stale_instances))
elif not Instance.objects.exists():
instance, _ = Instance.objects.get_or_create(name=SEED)
queue.put(instance)
existing_instance_ids.append(instance.name)
pool = multiprocessing.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, ))
queue.join()
pool = mp.Pool(NUM_THREADS, initializer=self.worker, initargs=(queue, existing_instance_ids))
queue.join()
end_time = time.time()
self.stdout.write(self.style.SUCCESS(log("Successfully scraped the fediverse in {:.0f}s".format(end_time-start_time))))