smug package¶
Subpackages¶
Submodules¶
smug.callback_helper module¶
-
class
smug.callback_helper.CallbackExchangeForward(forward_exchange_type=None)[source]¶ Bases:
objectForward the result of a message after processing to another exchange.
- It will not forward when:
- the result is None
- forward_exchange_type is None
Examples
>>> @CallbackForward("next_channel_name") >>> def some_handler(channel, method, properties, body): >>> result = do_something(body) >>> return result This example handles the incoming message from RabbitMQ in the do_something method. After the method has returned a result, it is returned by this method. CallbackExchangeForward will then forward the result to the exchange named forward_exchange_type.
Parameters: forward_exchange_type (str) – The channel to forward to. This is looked up in ConnectionManager.get_queue_name
-
class
smug.callback_helper.CallbackForward(forward_channel_type=None)[source]¶ Bases:
objectForward the result of a message after processing to another queue.
- It will not forward when:
- the result is None
- forward_channel_type is None
Examples
>>> @CallbackForward("next_channel_name") >>> def some_handler(channel, method, properties, body): >>> result = do_something(body) >>> return result This example handles the incoming message from RabbitMQ in the do_something method. After the method has returned a result, it is returned by this method. CallbackForward will then forward the result to the channel named forward_channel_type.
Parameters: forward_channel_type (str) – The channel to forward to. This is looked up in ConnectionManager.get_queue_name
smug.connection_manager module¶
-
class
smug.connection_manager.ConnectionManager(username: str = '', password: str = '', url: str = '', prefetch_count: int = -2)[source]¶ Bases:
objectThis manager handles all things related to RabbitMQ. Using this class one can connect to a RabbitMQ instance and not have to remake this code in every other class which needs connection to RabbitMQ.
This class starts a blocking connection meaning that it will keep the connection open as long as the class exists. By default the connection manager will connect to the vhost / and on port 5672
Parameters: - username (str, optional) – The username which is used to connect to the RabbitMQ node. If none is provide it will be fetched from the environment file.
- password (str, optional) – The password which is used to connect to the RabbitMQ node. If none is provide it will be fetched from the environment file.
- url (str, optional) – The url which is used to connect to the RabbitMQ node. If none is provide it will be fetched from the environment file.
- prefetch_count (int, optional) – The number of unacknowledged messages a worker can except. This is a natural way of spreading load between workers. If None is provided it will be fetched from the environment file.
Note
The recommended value for prefetch_count is around 500 since this maximises performance when using both a single worker and multiple different workers.
-
_subscribe(queue_type: str, callback: <built-in function callable>)[source]¶ Subscribes to a queue and starts consuming. When a new message is received the callback will be executed.
Parameters: - queue_type (str) – The queue to subscribe to.The actual queue name will be fetched based on the
queue name provided by the
get_queue_namefunction. - callback (function) – The function to execute upon receiving a message.
Note
This is a private function and should not be used directly. Use the
subscribe_to_queuefunction instead- queue_type (str) – The queue to subscribe to.The actual queue name will be fetched based on the
queue name provided by the
-
static
get_exchange_name(exchange_type)[source]¶ Returns the name of the provided exchange type. Is resolved by the
exchangesvariableExamples
>>> exchanges = { >>> 'process': {'name': '3_process', 'type': 'fanout'} >>> } >>> ConnectionManager.get_exchange_name('process') '3_process' >>> ConnectionManager.get_queue_name('non_existing_exchange') KeyError: 'non_existing_exchange'
Parameters: exchange_type – The exchange_type you want to get the name from. Name will be resolved using the exchangesvariable.Returns: Returns the name corresponding to the exchange_type Return type: str Raises: KeyError– the provided exchange_type is not present inexchanges
-
static
get_exchanges()[source]¶ Get’s all the exchanges in the
exchangesvariableReturns: returns the exchanges dict. Return type: dict
-
static
get_queue_name(queue_type: str)[source]¶ Returns the channel name for the provided queue_type. The queue name is fetched from the
queuesvariable.Examples
>>> queues = { >>> 'clean': "1_clean", >>> 'preprocess': "2_preprocess", >>> 'process_wordvec': json.loads('{"name":"3_process_wordvec","exchange":"3_process"}'), >>> 'process_location': json.loads('{"name":"3_process_location","exchange":"3_process"}'), >>> 'save': '4_save', >>> } >>> ConnectionManager.get_queue_name('clean') '1_clean' >>> ConnectionManager.get_queue_name('process_wordvec') '3_process_wordvec' >>> ConnectionManager.get_queue_name('non_existing_queue') KeyError: 'non_existing_queue'
Parameters: queue_type (str) – The queue_type you want to get the name from. Name will be resolved using the queuesvariable.Returns: queue_name if successful If the queue_type is present in
queuesreturns the corresponding queue name. If the queue_type is one with a exchange binding returns the name.Return type: str Raises: KeyError– the provided queue_type is not present inqueues
-
static
get_queues()[source]¶ Get’s all the queues in the
queuesvariableReturns: returns the queue dict. Return type: dict
-
subscribe_to_queue(queue_type: str, callback: <built-in function callable>)[source]¶ Subscribe to a queue. Once a message is received it wil be passed to the callback which will then be executed. Uses the
_subscribefunction underwater.Examples
>>> def callback(ch, method, properties, body): >>> print('Got message {}'.format(body)) >>> # Create a connection manager and subscribe to the test queue >>> connection_manager = ConnectionManager() >>> connection_manager.subscribe_to_queue('clean', callback) >>> # Send a message to test the callback is working. >>> connection_manager.publish_to_queue('clean', 'test') 'Got message test'
Parameters: - queue_type – The queue to subscribe to.The actual queue name will be fetched based on the
queue name provided by the
get_queue_namefunction. - callback – The function to execute upon receiving a message.
- queue_type – The queue to subscribe to.The actual queue name will be fetched based on the
queue name provided by the
smug.mongo_manager module¶
-
class
smug.mongo_manager.MongoConfig(mongo_url: str = '', database: str = '', port: int = -1, message_collection_name: str = '', report_collection_name: str = '')[source]¶ Bases:
objectMongoDB configuration object. This object can be used to create a custom configuration for the MongoDB connection.
Examples
>>> from smug.mongo_manager import MongoConfig, MongoManager >>> mongo_config = MongoConfig(port=12345)
This will create a config with all of the default values from the environment file with the exception of the
portwhich will be 12345. This config can then be used with theMongoManagerin order to use this custom config.>>> mongo_manager = MongoManager(config=mongo_config)
Parameters: - mongo_url (str, optional) – The url of the collection to connect to. If no value is provided value will be read from MONGO_URI variable in the environment file.
- database (str, optional) – Name of the database to connect to. If no value is provided value will be read from MONGO_DATABASE variable in the environment file.
- port (int, optional) – The port to connect to. If ‘no value is provided the port will be the default port for mongo db this is port 27017
- message_collection_name (str, optional) – Name of the collection storing the message. If no value is provided value will be read from MONGO_MESSAGES_DATABASE variable in the environment file.
- report_collection_name (str, optional) – Name of the collection storing the reports. If no value is provided value will be read from MONGO_REPORT_DATABASE variable in the environment file.
Note
If no parameters are passed the default values from the environment file will be used.
-
class
smug.mongo_manager.MongoManager(config: smug.mongo_manager.MongoConfig = <smug.mongo_manager.MongoConfig object>)[source]¶ Bases:
objectClass for connecting to MongoDB server. This class ensures that all MongoDB connections have the correct properties and configs. This class should be used when connecting to a MongoDB database.
Examples
>>> from smug.mongo_manager import MongoManager >>> mongo_manager = MongoManager()
Now that we have our
MongoManagerwe can use it’s message_collection attribute in order to interact with our message collection.>>> mongo_manager.message_collection.insert('test')
Parameters: config (MongoConfig, optional) – The configuration used by the manager. For available options see MongoConfig