smug package

Submodules

smug.callback_helper module

class smug.callback_helper.CallbackExchangeForward(forward_exchange_type=None)[source]

Bases: object

Forward the result of a message after processing to another exchange.

It will not forward when:
  • the result is None
  • forward_exchange_type is None

Examples

>>> @CallbackForward("next_channel_name")
>>> def some_handler(channel, method, properties, body):
>>>     result = do_something(body)
>>>     return result
This example handles the incoming message from RabbitMQ in the do_something method.
After the method has returned a result, it is returned by this method.
CallbackExchangeForward will then forward the result to the exchange named forward_exchange_type.
Parameters:forward_exchange_type (str) – The channel to forward to. This is looked up in ConnectionManager.get_queue_name
class smug.callback_helper.CallbackForward(forward_channel_type=None)[source]

Bases: object

Forward the result of a message after processing to another queue.

It will not forward when:
  • the result is None
  • forward_channel_type is None

Examples

>>> @CallbackForward("next_channel_name")
>>> def some_handler(channel, method, properties, body):
>>>     result = do_something(body)
>>>     return result
This example handles the incoming message from RabbitMQ in the do_something method.
After the method has returned a result, it is returned by this method.
CallbackForward will then forward the result to the channel named forward_channel_type.
Parameters:forward_channel_type (str) – The channel to forward to. This is looked up in ConnectionManager.get_queue_name

smug.connection_manager module

class smug.connection_manager.ConnectionManager(username: str = '', password: str = '', url: str = '', prefetch_count: int = -2)[source]

Bases: object

This manager handles all things related to RabbitMQ. Using this class one can connect to a RabbitMQ instance and not have to remake this code in every other class which needs connection to RabbitMQ.

This class starts a blocking connection meaning that it will keep the connection open as long as the class exists. By default the connection manager will connect to the vhost / and on port 5672

Parameters:
  • username (str, optional) – The username which is used to connect to the RabbitMQ node. If none is provide it will be fetched from the environment file.
  • password (str, optional) – The password which is used to connect to the RabbitMQ node. If none is provide it will be fetched from the environment file.
  • url (str, optional) – The url which is used to connect to the RabbitMQ node. If none is provide it will be fetched from the environment file.
  • prefetch_count (int, optional) – The number of unacknowledged messages a worker can except. This is a natural way of spreading load between workers. If None is provided it will be fetched from the environment file.

Note

The recommended value for prefetch_count is around 500 since this maximises performance when using both a single worker and multiple different workers.

_subscribe(queue_type: str, callback: <built-in function callable>)[source]

Subscribes to a queue and starts consuming. When a new message is received the callback will be executed.

Parameters:
  • queue_type (str) – The queue to subscribe to.The actual queue name will be fetched based on the queue name provided by the get_queue_name function.
  • callback (function) – The function to execute upon receiving a message.

Note

This is a private function and should not be used directly. Use the subscribe_to_queue function instead

static get_exchange_name(exchange_type)[source]

Returns the name of the provided exchange type. Is resolved by the exchanges variable

Examples

>>> exchanges = {
>>>     'process': {'name': '3_process', 'type': 'fanout'}
>>> }
>>> ConnectionManager.get_exchange_name('process')
'3_process'
>>> ConnectionManager.get_queue_name('non_existing_exchange')
KeyError: 'non_existing_exchange'
Parameters:exchange_type – The exchange_type you want to get the name from. Name will be resolved using the exchanges variable.
Returns:Returns the name corresponding to the exchange_type
Return type:str
Raises:KeyError – the provided exchange_type is not present in exchanges
static get_exchanges()[source]

Get’s all the exchanges in the exchanges variable

Returns:returns the exchanges dict.
Return type:dict
static get_queue_name(queue_type: str)[source]

Returns the channel name for the provided queue_type. The queue name is fetched from the queues variable.

Examples

>>> queues = {
>>>     'clean': "1_clean",
>>>     'preprocess': "2_preprocess",
>>>     'process_wordvec': json.loads('{"name":"3_process_wordvec","exchange":"3_process"}'),
>>>     'process_location': json.loads('{"name":"3_process_location","exchange":"3_process"}'),
>>>     'save': '4_save',
>>> }
>>> ConnectionManager.get_queue_name('clean')
'1_clean'
>>> ConnectionManager.get_queue_name('process_wordvec')
'3_process_wordvec'
>>> ConnectionManager.get_queue_name('non_existing_queue')
KeyError: 'non_existing_queue'
Parameters:queue_type (str) – The queue_type you want to get the name from. Name will be resolved using the queues variable.
Returns:queue_name if successful

If the queue_type is present in queues returns the corresponding queue name. If the queue_type is one with a exchange binding returns the name.

Return type:str
Raises:KeyError – the provided queue_type is not present in queues
static get_queues()[source]

Get’s all the queues in the queues variable

Returns:returns the queue dict.
Return type:dict
publish_to_queue(queue_type: str, message: str)[source]

Sends a message to a queue.

Parameters:
  • queue_type (str) – The queue to send the message to. The actual queue name will be fetched based on the queue name provided by the get_queue_name function.
  • message (str) – The message to publish to the queue.
subscribe_to_queue(queue_type: str, callback: <built-in function callable>)[source]

Subscribe to a queue. Once a message is received it wil be passed to the callback which will then be executed. Uses the _subscribe function underwater.

Examples

>>> def callback(ch, method, properties, body):
>>>     print('Got message {}'.format(body))
>>> # Create a connection manager and subscribe to the test queue
>>> connection_manager = ConnectionManager()
>>> connection_manager.subscribe_to_queue('clean', callback)
>>> # Send a message to test the callback is working.
>>> connection_manager.publish_to_queue('clean', 'test')
'Got message test'
Parameters:
  • queue_type – The queue to subscribe to.The actual queue name will be fetched based on the queue name provided by the get_queue_name function.
  • callback – The function to execute upon receiving a message.
smug.connection_manager.try_parse_json(json_text: str)[source]

smug.mongo_manager module

class smug.mongo_manager.MongoConfig(mongo_url: str = '', database: str = '', port: int = -1, message_collection_name: str = '', report_collection_name: str = '')[source]

Bases: object

MongoDB configuration object. This object can be used to create a custom configuration for the MongoDB connection.

Examples

>>> from smug.mongo_manager import MongoConfig, MongoManager
>>> mongo_config = MongoConfig(port=12345)

This will create a config with all of the default values from the environment file with the exception of the port which will be 12345. This config can then be used with the MongoManager in order to use this custom config.

>>> mongo_manager = MongoManager(config=mongo_config)
Parameters:
  • mongo_url (str, optional) – The url of the collection to connect to. If no value is provided value will be read from MONGO_URI variable in the environment file.
  • database (str, optional) – Name of the database to connect to. If no value is provided value will be read from MONGO_DATABASE variable in the environment file.
  • port (int, optional) – The port to connect to. If ‘no value is provided the port will be the default port for mongo db this is port 27017
  • message_collection_name (str, optional) – Name of the collection storing the message. If no value is provided value will be read from MONGO_MESSAGES_DATABASE variable in the environment file.
  • report_collection_name (str, optional) – Name of the collection storing the reports. If no value is provided value will be read from MONGO_REPORT_DATABASE variable in the environment file.

Note

If no parameters are passed the default values from the environment file will be used.

class smug.mongo_manager.MongoManager(config: smug.mongo_manager.MongoConfig = <smug.mongo_manager.MongoConfig object>)[source]

Bases: object

Class for connecting to MongoDB server. This class ensures that all MongoDB connections have the correct properties and configs. This class should be used when connecting to a MongoDB database.

Examples

>>> from smug.mongo_manager import MongoManager
>>> mongo_manager = MongoManager()

Now that we have our MongoManager we can use it’s message_collection attribute in order to interact with our message collection.

>>> mongo_manager.message_collection.insert('test')
Parameters:config (MongoConfig, optional) – The configuration used by the manager. For available options see MongoConfig
get_reports()[source]

Retrieves active reports from the report_collection

Returns:Cursor containing all the enabled reports in the reports_collection.
Return type:pymongo.cursor.Cursor

smug.run module

smug.run.run()[source]
smug.run.signal_handler(signal, frame)[source]

smug.send_to_smug_helper module

class smug.send_to_smug_helper.SendToSmugHelper[source]

Bases: object

A wrapper class that takes care of sending any sort of data to smug

hash_username(username)[source]

Module contents