Source code for smug.importers.streaming_importer

import locale
from argparse import ArgumentParser

import pkg_resources
import json
from bson import json_util
import os
import tweepy
from dotenv import load_dotenv

from send_to_smug_helper import SendToSmugHelper
from smug.connection_manager import ConnectionManager


[docs]class StreamingListener(tweepy.StreamListener): def __init__(self): super().__init__() env_location = pkg_resources.resource_filename('resources', '.env') if os.environ.get('DOTENV_LOADED', '0') != '1': load_dotenv(env_location) consumer_token = os.environ.get("TWITTER_CONSUMER_TOKEN", "TOKEN") consumer_secret = os.environ.get("TWITTER_CONSUMER_SECRET", "SECRET") access_token = os.environ.get("TWITTER_ACCESS_TOKEN", "TOKEN") access_token_secret = os.environ.get("TWITTER_ACCESS_TOKEN_SECRET", "SECRET") auth = tweepy.OAuthHandler(consumer_token, consumer_secret) auth.set_access_token(access_token, access_token_secret) stream = tweepy.Stream(auth=auth, listener=self) stream.filter(locations=[3.31497114423, 50.803721015, 7.09205325687, 53.5104033474]) print(stream.running)
[docs] @SendToSmugHelper() def on_status(self, original_message): if original_message.lang == 'nl': return { 'message': original_message.text, 'author': original_message.author.name, 'metadata': { 'date': original_message.created_at, 'url': original_message.id, 'type': 'post', 'source': 'twitter', 'source_import': 'live_twitter', 'lang': locale.normalize('{}.utf-8'.format(original_message.lang)) } }
if __name__ == '__main__': StreamingListener()