Source code for smug.savers.mongo_save

import pkg_resources
from pymongo import UpdateOne
import os
from dotenv import load_dotenv
import simplejson as json
from bson import json_util
import threading

from smug.mongo_manager import MongoManager
from smug.connection_manager import ConnectionManager


[docs]class MongoSave(): def __init__(self, write_buffer_size, buffer_enabled=True): self.buffer = {} self.write_buffer_size = write_buffer_size self.ch = None self.mongo_manager = MongoManager() self.lock = threading.RLock() self.buffer_enabled = buffer_enabled
[docs] def save(self): self.lock.acquire() if len(self.buffer) > 0: messages = self.buffer.values() latest_message = json.dumps(list(messages)[-1], default=json_util.default) connection_manager.publish_to_queue('latest', latest_message) requests = [UpdateOne({'metadata.url': value['metadata']['url']}, {'$setOnInsert': { 'metadata': value['metadata'], 'author': value['author'], 'message': value['message'] }, '$addToSet': {'reports': {"$each": value['reports']}}}, upsert=True) for value in messages] for delivery_tag in self.buffer: # Ack to the MQ self.ch.basic_ack(delivery_tag=delivery_tag) self.mongo_manager.message_collection.bulk_write(requests) self.buffer.clear() self.lock.release()
[docs] def callback(self, ch, method, properties, body): self.lock.acquire() self.ch = ch self.buffer[method.delivery_tag] = (json.loads(body, object_hook=json_util.object_hook)) self.lock.release() # Writes to the database if the buffer is the correct length if len(self.buffer) >= self.write_buffer_size or not self.buffer_enabled: self.save()
if __name__ == '__main__': env_location = pkg_resources.resource_filename('resources', '.env') if os.environ.get('DOTENV_LOADED', '0') != '1': load_dotenv(env_location) mongouri = os.environ.get("MONGODB_URI", "mongodb://localhost:27017/smug") mongodb = os.environ.get("MONGODB_DATABASE", "smug") write_buffer_size = int(os.environ.get("MONGO_WRITE_BUFFER", 100)) prefetch_count = int(os.environ.get("PREFETCH_COUNT", 500)) if write_buffer_size > prefetch_count: raise ValueError('MongoDB write buffer should not exceed prefetch count. This will cause the') mongo_save = MongoSave(write_buffer_size=write_buffer_size, buffer_enabled=True) connection_manager = ConnectionManager() connection_manager.subscribe_to_queue('save', mongo_save.callback)