From d884ab988511d58e1f49637a02f128aa2625dbbb Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Wed, 8 Feb 2017 19:26:20 -0800 Subject: [PATCH 01/11] support for ifttt --- cool_finance/server.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/cool_finance/server.py b/cool_finance/server.py index cfb9f1a..7ab89ae 100644 --- a/cool_finance/server.py +++ b/cool_finance/server.py @@ -7,6 +7,7 @@ import time from pytz import timezone +import requests from yahoo_finance import Share from cool_finance import constants @@ -23,18 +24,27 @@ def __init__(self, db_client, stock_setting, stop_event): self.targets_list = stock_setting.get("targets_list") self.db_client = db_client self.stopped = stop_event + self.info_obj = Share(self.stock_symbol) def _get_stock_data(self, stock_symbol): - data = Share(stock_symbol) + data = self.info_obj + data.refresh() self.db_client.insert_one(data.data_set, stock_symbol) - return data + return self.info_obj def _get_notice_msg(self, stock_setting, price): - msg = dict(stock_setting) - msg['price'] = price - return msg + # msg = dict(stock_setting) + # msg['price'] = price + # return msg + payload = {'value1': self.stock_symbol, + 'value2': price, + 'value3': "test"} + return payload def _send_nofitication(self, msg): + r = requests.post("https://maker.ifttt.com/trigger/log_update/" + "with/key/cJmpqL9rVFwEOAnNmRAfFn", + data=msg) print msg def run(self): From e9d516c7fd0361a6cf040ddbc8909cfbacdd48f5 Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Thu, 9 Feb 2017 01:13:19 -0800 Subject: [PATCH 02/11] add multiple datasources support and googlefinance api --- cool_finance.py | 2 +- cool_finance/constants.py | 5 +- cool_finance/data_sources/__init__.py | 0 cool_finance/data_sources/constants.py | 14 +++ cool_finance/data_sources/manager.py | 19 ++++ cool_finance/data_sources/vendors/__init__.py | 0 cool_finance/data_sources/vendors/common.py | 91 +++++++++++++++++++ .../data_sources/vendors/google_finance.py | 29 ++++++ cool_finance/db.py | 1 - cool_finance/server.py | 38 ++++---- requirements.txt | 2 + ulits/forked_pdb.py | 2 +- 12 files changed, 180 insertions(+), 23 deletions(-) create mode 100644 cool_finance/data_sources/__init__.py create mode 100644 cool_finance/data_sources/constants.py create mode 100644 cool_finance/data_sources/manager.py create mode 100644 cool_finance/data_sources/vendors/__init__.py create mode 100644 cool_finance/data_sources/vendors/common.py create mode 100644 cool_finance/data_sources/vendors/google_finance.py diff --git a/cool_finance.py b/cool_finance.py index 0e8b2c0..74b0282 100644 --- a/cool_finance.py +++ b/cool_finance.py @@ -4,4 +4,4 @@ from cool_finance.server import main if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file + sys.exit(main()) diff --git a/cool_finance/constants.py b/cool_finance/constants.py index f15f764..2cd5ea7 100644 --- a/cool_finance/constants.py +++ b/cool_finance/constants.py @@ -5,7 +5,10 @@ CONFIG_FILE = "./cool_finance.json" -START_HOUR_MIN_SEC = (7, 57, 00) +# set START_NOW will override the START_HOUR_MIN_SEC +# and start the server immediately +START_NOW = False +START_HOUR_MIN_SEC = (9, 30, 00) END_HOUR_MIN_SEC = (16, 00, 00) TIMEZONE = 'US/Eastern' # Sat, Sun are closed market day diff --git a/cool_finance/data_sources/__init__.py b/cool_finance/data_sources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cool_finance/data_sources/constants.py b/cool_finance/data_sources/constants.py new file mode 100644 index 0000000..a75c0b5 --- /dev/null +++ b/cool_finance/data_sources/constants.py @@ -0,0 +1,14 @@ +# datasource vendor names +BASE_VENDOR = "BASE" +GOOGLE_FINANCE_VENDOR = "google_finance" + +DEFAULT_DATASOURCE = GOOGLE_FINANCE_VENDOR + +INDEX = "index" # NASDAQ, NYSE... +LAST_TRADE_PRICE = "last_trade_price" +LAST_TRADE_DATETIME = "last_trade_datetime" +LAST_TRADE_DATE = "last_trade_date" +LAST_TRADE_TIME = "last_trade_time" +YIELD = "yield" +STOCK_SYMBOL = "stock_symbol" +DIVIDEND = "dividend" diff --git a/cool_finance/data_sources/manager.py b/cool_finance/data_sources/manager.py new file mode 100644 index 0000000..200d103 --- /dev/null +++ b/cool_finance/data_sources/manager.py @@ -0,0 +1,19 @@ +from cool_finance.data_sources import constants as const +from cool_finance.data_sources.vendors import (common, google_finance) + + +class DataSourceManager(object): + + _supported_vendors = { + const.BASE_VENDOR: common.BaseSource, + const.GOOGLE_FINANCE_VENDOR: google_finance.GoogleFinance + } + + def __init__(self, default_vendor=const.DEFAULT_DATASOURCE): + self._default_vendor = default_vendor + + def get_vendor(self, vendor=None): + # return the vendor class + if not vendor: + vendor = self._default_vendor + return self._supported_vendors[vendor] diff --git a/cool_finance/data_sources/vendors/__init__.py b/cool_finance/data_sources/vendors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cool_finance/data_sources/vendors/common.py b/cool_finance/data_sources/vendors/common.py new file mode 100644 index 0000000..b4ddc42 --- /dev/null +++ b/cool_finance/data_sources/vendors/common.py @@ -0,0 +1,91 @@ +import logging + +from cool_finance.data_sources import constants as const + +logger = logging.getLogger(__name__) + + +class BaseSource(object): + + vendor_name = const.BASE_VENDOR + + # _supported_apis = [] + _support_data_json = True + _translate_vendor_specific_key_to_common_key = True + # _data_json_keys = {common_key:vendor_specific_key} + _data_json_keys = { + const.INDEX: const.INDEX, + const.STOCK_SYMBOL: const.STOCK_SYMBOL, + const.LAST_TRADE_PRICE: const.LAST_TRADE_PRICE, + const.LAST_TRADE_DATETIME: const.LAST_TRADE_DATETIME, + const.LAST_TRADE_DATE: const.LAST_TRADE_DATE, + const.LAST_TRADE_TIME: const.LAST_TRADE_TIME, + const.YIELD: const.YIELD, + const.DIVIDEND: const.DIVIDEND + } + _data_json_related_apis = [ + "fetch_data_json", + "refresh_data_json", + "get_data_json", + "get_value_from_data_json" + ] + + def __init__(self, stock_symbol, *args, **kwargs): + self.stock_symbol = stock_symbol + if self._support_data_json: + self._data_json = self.fetch_data_json(self.stock_symbol) + + def __getattr__(self, name): + if (not self._support_data_json and + name in self._data_json_related_apis): + raise NotImplementedError( + "Method %s is not implemented by data source: %s.", + name, self.vendor_name) + else: + return super(BaseSource, self).__getattribute__(name) + + def _key_translate(self, data_json): + if self._translate_vendor_specific_key_to_common_key: + for c_key, v_key in self._data_json_keys.items(): + data_json[c_key] = data_json.pop(v_key) + + return data_json + + def _fetch(self, *args, **kwargs): + # make your vendor specific request here... + # return a json dict + stock_symbol = self.stock_symbol + data_json = {} + for key in self._data_json_keys.keys(): + data_json[key] = "foo" + return data_json + + def fetch_data_json(self, *args, **kwargs): + data_json = self._fetch(args, kwargs) + self._data_json = self._key_translate(data_json) + return self._data_json + + def fetch(self, *args, **kwargs): + return self.fetch_data_json(args, kwargs) + + def refresh_data_json(self, *args, **kwargs): + return self.fetch_data_json(args, kwargs) + + def refresh(self, *args, **kwargs): + return self.refresh_data_json(args, kwargs) + + def get_data_json(self): + return self._data_json + + def get_value_from_data_json(self, key, is_common_key=True): + # if is_common_key, keys must be in self._data_json_keys.keys() + # if not is_common_key, it means vendor specific key + if is_common_key and key not in self._data_json_keys.keys(): + raise KeyError( + "Common key %s is not supported by data source %s.", + key, self.vendor_name) + + return self._data_json[key] + + def get_price(self): + return self._data_json[const.LAST_TRADE_PRICE] diff --git a/cool_finance/data_sources/vendors/google_finance.py b/cool_finance/data_sources/vendors/google_finance.py new file mode 100644 index 0000000..ad0a5ca --- /dev/null +++ b/cool_finance/data_sources/vendors/google_finance.py @@ -0,0 +1,29 @@ +import logging + +from googlefinance import getQuotes + +from cool_finance.data_sources import constants as const +from cool_finance.data_sources.vendors.common import BaseSource + +logger = logging.getLogger(__name__) + + +class GoogleFinance(BaseSource): + + vendor_name = const.GOOGLE_FINANCE_VENDOR + + _data_json_keys = { + const.INDEX: "Index", + const.STOCK_SYMBOL: "StockSymbol", + const.LAST_TRADE_PRICE: "LastTradePrice", + const.LAST_TRADE_DATETIME: "LastTradeDateTime", + const.LAST_TRADE_TIME: "LastTradeTime", + const.YIELD: "Yield", + const.DIVIDEND: "Dividend" + } + + def _fetch(self, *args, **kwargs): + # googlefinance has a bug if the getQuote() input is unicode + # change the type to str explicitly + stock_symbol = self.stock_symbol.encode('utf-8') + return getQuotes(stock_symbol)[0] diff --git a/cool_finance/db.py b/cool_finance/db.py index c53018f..0cfdbb7 100644 --- a/cool_finance/db.py +++ b/cool_finance/db.py @@ -17,4 +17,3 @@ def insert_one(self, data_json, collection_name): collection = self.db[collection_name] id = collection.insert_one(data_json).inserted_id return id - diff --git a/cool_finance/server.py b/cool_finance/server.py index 7ab89ae..62be79b 100644 --- a/cool_finance/server.py +++ b/cool_finance/server.py @@ -1,22 +1,19 @@ import datetime import json -import signal -import sys from threading import Event from threading import Thread import time from pytz import timezone import requests -from yahoo_finance import Share from cool_finance import constants from cool_finance import db -from ulits.forked_pdb import ForkedPdb +from cool_finance.data_sources.manager import DataSourceManager class Worker(Thread): - def __init__(self, db_client, stock_setting, stop_event): + def __init__(self, db_client, datasource_mgr, stock_setting, stop_event): super(Worker, self).__init__() self.stock_setting = stock_setting self.stock_symbol = stock_setting["name"] @@ -24,12 +21,13 @@ def __init__(self, db_client, stock_setting, stop_event): self.targets_list = stock_setting.get("targets_list") self.db_client = db_client self.stopped = stop_event - self.info_obj = Share(self.stock_symbol) + self.info_obj = datasource_mgr.get_vendor()(self.stock_symbol) def _get_stock_data(self, stock_symbol): data = self.info_obj data.refresh() - self.db_client.insert_one(data.data_set, stock_symbol) + data_set = data.get_data_json() + self.db_client.insert_one(data_set, stock_symbol) return self.info_obj def _get_notice_msg(self, stock_setting, price): @@ -62,9 +60,10 @@ class Server(object): def __init__(self, config_file=constants.CONFIG_FILE): self._reload_config(config_file) - self.db_client = db.Client(constants.DB_HOST, - constants.DB_PORT) - self.worker_stopflag_list = [] + self._db_client = db.Client(constants.DB_HOST, + constants.DB_PORT) + self._datasource_mgr = DataSourceManager() + self._worker_stopflag_list = [] def _reload_config(self, config_file): with open(config_file) as stocks_config: @@ -75,18 +74,19 @@ def _reload_config(self, config_file): def start(self): for stock in self.stocks_list: stopflag = Event() - worker = Worker(self.db_client, stock, stopflag) - self.worker_stopflag_list.append((worker, stopflag)) + worker = Worker(self._db_client, self._datasource_mgr, + stock, stopflag) + self._worker_stopflag_list.append((worker, stopflag)) - for worker, stopflag in self.worker_stopflag_list: + for worker, stopflag in self._worker_stopflag_list: worker.start() def end(self): - for worker, stopflag in self.worker_stopflag_list: + for worker, stopflag in self._worker_stopflag_list: stopflag.set() def wait_all_workers_done(self): - for worker, stopflag in self.worker_stopflag_list: + for worker, stopflag in self._worker_stopflag_list: worker.join() @@ -114,12 +114,12 @@ def get_start_and_end_datetime(start_hour_min_sec=constants.START_HOUR_MIN_SEC, def main(): + server = Server() try: - server = Server() - start_datetime, end_datetime, tz = get_start_and_end_datetime() - while datetime.datetime.now(tz) < start_datetime: - time.sleep(1) + if not constants.START_NOW: + while datetime.datetime.now(tz) < start_datetime: + time.sleep(1) print "start" server.start() diff --git a/requirements.txt b/requirements.txt index 948ef19..8a3fa1f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +googlefinance pymongo pytz +requests yahoo-finance \ No newline at end of file diff --git a/ulits/forked_pdb.py b/ulits/forked_pdb.py index 0b9b59e..6983327 100644 --- a/ulits/forked_pdb.py +++ b/ulits/forked_pdb.py @@ -13,4 +13,4 @@ def interaction(self, *args, **kwargs): sys.stdin = file('/dev/stdin') pdb.Pdb.interaction(self, *args, **kwargs) finally: - sys.stdin = _stdin \ No newline at end of file + sys.stdin = _stdin From 7359a444f16d2731640a666382a14158e2e174eb Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Thu, 9 Feb 2017 01:18:20 -0800 Subject: [PATCH 03/11] add logging support --- cool_finance/constants.py | 11 +++++++++- cool_finance/server.py | 46 +++++++++++++++++++++++++++++++++------ 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/cool_finance/constants.py b/cool_finance/constants.py index 2cd5ea7..1edd7ab 100644 --- a/cool_finance/constants.py +++ b/cool_finance/constants.py @@ -5,9 +5,18 @@ CONFIG_FILE = "./cool_finance.json" +LOG_LEVEL = "INFO" +LOG_FORMAT = '%(asctime)s - %(name)s - %(threadName)s - ' \ + '%(levelname)s - %(message)s' +# DEBUG_LOG_LEVEL must stricter than LOG_LEVEL +# if DEBUG_LOG_FILE is not None, DEBUG_LOG_LEVEL must be set +# if if DEBUG_LOG_FILE is None, DEBUG_LOG_LEVEL would be ignored +DEBUG_LOG_FILE = None +DEBUG_LOG_LEVEL = "DEBUG" + # set START_NOW will override the START_HOUR_MIN_SEC # and start the server immediately -START_NOW = False +START_NOW = True START_HOUR_MIN_SEC = (9, 30, 00) END_HOUR_MIN_SEC = (16, 00, 00) TIMEZONE = 'US/Eastern' diff --git a/cool_finance/server.py b/cool_finance/server.py index 62be79b..c07a836 100644 --- a/cool_finance/server.py +++ b/cool_finance/server.py @@ -1,5 +1,6 @@ import datetime import json +import logging from threading import Event from threading import Thread import time @@ -11,6 +12,28 @@ from cool_finance import db from cool_finance.data_sources.manager import DataSourceManager +# create logger +logger = logging.getLogger(__name__) +if constants.DEBUG_LOG_FILE: + logger.setLevel(constants.DEBUG_LOG_LEVEL) +else: + logger.setLevel(constants.LOG_LEVEL) +# create console handler with a higher log level +ch = logging.StreamHandler() +ch.setLevel(constants.LOG_LEVEL) +# create formatter and add it to the handlers +formatter = logging.Formatter(constants.LOG_FORMAT) +ch.setFormatter(formatter) +# add the handlers to the logger +logger.addHandler(ch) + +# create file handler which logs even debug messages +if constants.DEBUG_LOG_FILE: + fh = logging.FileHandler(constants.DEBUG_LOG_FILE) + fh.setLevel(constants.DEBUG_LOG_LEVEL) + fh.setFormatter(formatter) + logger.addHandler(fh) + class Worker(Thread): def __init__(self, db_client, datasource_mgr, stock_setting, stop_event): @@ -43,12 +66,13 @@ def _send_nofitication(self, msg): r = requests.post("https://maker.ifttt.com/trigger/log_update/" "with/key/cJmpqL9rVFwEOAnNmRAfFn", data=msg) - print msg + logger.info("Sent notification: %s", msg) def run(self): while not (self.stopped.wait(self.delay) or self.stopped.is_set()): data = self._get_stock_data(self.stock_symbol) price = data.get_price() + logger.debug("Got %s price: $%s", self.stock_symbol, str(price)) for target in self.targets_list: if abs(float(price) - target) <= 0.01: notice_msg = self._get_notice_msg( @@ -107,28 +131,36 @@ def get_start_and_end_datetime(start_hour_min_sec=constants.START_HOUR_MIN_SEC, minute=end_hour_min_sec[1], second=end_hour_min_sec[2]) - delta = start_datetime - current_datetime - print delta - return start_datetime, end_datetime, tz def main(): + logger.info("Welcome to Cool Finance by F.JHL.") server = Server() try: start_datetime, end_datetime, tz = get_start_and_end_datetime() if not constants.START_NOW: + logger.info("The server will start at %s.", start_datetime) + logger.info("The server will end at %s.", end_datetime) + delta = start_datetime - datetime.datetime.now(tz) + logger.info("The server will start %s later.", delta) while datetime.datetime.now(tz) < start_datetime: time.sleep(1) - print "start" + logger.info("The server is going to start.") server.start() + logger.info("The server is already started.") while datetime.datetime.now(tz) < end_datetime: time.sleep(1) - print "end" + logger.info("The server is going to end.") server.end() + logger.info("The server is already end.") except KeyboardInterrupt: - print "KeyboardInterrupt" + logger.info("KeyboardInterrupt: The server is going to end.") server.end() + logger.info("The server is already end.") finally: + logger.info("The server is going to wait for " + "all pending jobs complete.") server.wait_all_workers_done() + logger.info("All jobs completes. Bye.") From d0d1a91fc5e3d0c3aad1d3a26ed3bad1801123d4 Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Thu, 9 Feb 2017 12:06:05 -0800 Subject: [PATCH 04/11] log uncaught exception --- cool_finance/server.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cool_finance/server.py b/cool_finance/server.py index c07a836..cd85d35 100644 --- a/cool_finance/server.py +++ b/cool_finance/server.py @@ -4,6 +4,8 @@ from threading import Event from threading import Thread import time +import traceback +import sys from pytz import timezone import requests @@ -35,6 +37,13 @@ logger.addHandler(fh) +def log_uncaught_exceptions(ex_cls, ex, tb): + logger.error(''.join(traceback.format_tb(tb))) + logger.error('{0}: {1}'.format(ex_cls, ex)) + +sys.excepthook = log_uncaught_exceptions + + class Worker(Thread): def __init__(self, db_client, datasource_mgr, stock_setting, stop_event): super(Worker, self).__init__() From cffd5f3e3fb96f88dc365d0d391303de3a450bc2 Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Thu, 9 Feb 2017 13:06:37 -0800 Subject: [PATCH 05/11] add support for optional keys --- cool_finance/data_sources/vendors/common.py | 13 ++++++++++++- cool_finance/data_sources/vendors/google_finance.py | 5 ++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/cool_finance/data_sources/vendors/common.py b/cool_finance/data_sources/vendors/common.py index b4ddc42..764b57e 100644 --- a/cool_finance/data_sources/vendors/common.py +++ b/cool_finance/data_sources/vendors/common.py @@ -13,16 +13,23 @@ class BaseSource(object): _support_data_json = True _translate_vendor_specific_key_to_common_key = True # _data_json_keys = {common_key:vendor_specific_key} + # Those keys are required keys. + # It will raise an KeyError is Data_json misses any of those. _data_json_keys = { const.INDEX: const.INDEX, const.STOCK_SYMBOL: const.STOCK_SYMBOL, const.LAST_TRADE_PRICE: const.LAST_TRADE_PRICE, const.LAST_TRADE_DATETIME: const.LAST_TRADE_DATETIME, const.LAST_TRADE_DATE: const.LAST_TRADE_DATE, - const.LAST_TRADE_TIME: const.LAST_TRADE_TIME, + const.LAST_TRADE_TIME: const.LAST_TRADE_TIME + } + # _data_json_optional_keys are optional keys. + # It will NOT raise an KeyError is Data_json misses any of those. + _data_json_optional_keys = { const.YIELD: const.YIELD, const.DIVIDEND: const.DIVIDEND } + _data_json_related_apis = [ "fetch_data_json", "refresh_data_json", @@ -49,6 +56,10 @@ def _key_translate(self, data_json): for c_key, v_key in self._data_json_keys.items(): data_json[c_key] = data_json.pop(v_key) + for c_key, v_key in self._data_json_optional_keys.items(): + if data_json.get(v_key): + data_json[c_key] = data_json.pop(v_key) + return data_json def _fetch(self, *args, **kwargs): diff --git a/cool_finance/data_sources/vendors/google_finance.py b/cool_finance/data_sources/vendors/google_finance.py index ad0a5ca..005cf2a 100644 --- a/cool_finance/data_sources/vendors/google_finance.py +++ b/cool_finance/data_sources/vendors/google_finance.py @@ -17,7 +17,10 @@ class GoogleFinance(BaseSource): const.STOCK_SYMBOL: "StockSymbol", const.LAST_TRADE_PRICE: "LastTradePrice", const.LAST_TRADE_DATETIME: "LastTradeDateTime", - const.LAST_TRADE_TIME: "LastTradeTime", + const.LAST_TRADE_TIME: "LastTradeTime" + } + + _data_json_optional_keys = { const.YIELD: "Yield", const.DIVIDEND: "Dividend" } From bbeda6c504c5644b1804ee84dbf65d5c0d163fa4 Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Fri, 10 Feb 2017 05:23:11 -0800 Subject: [PATCH 06/11] retry query if request fails --- cool_finance/data_sources/vendors/common.py | 2 +- cool_finance/server.py | 14 +++++- cool_finance/tests/__init__.py | 0 cool_finance/tests/test_server.py | 56 +++++++++++++++++++++ 4 files changed, 69 insertions(+), 3 deletions(-) create mode 100644 cool_finance/tests/__init__.py create mode 100644 cool_finance/tests/test_server.py diff --git a/cool_finance/data_sources/vendors/common.py b/cool_finance/data_sources/vendors/common.py index 764b57e..ac39b5f 100644 --- a/cool_finance/data_sources/vendors/common.py +++ b/cool_finance/data_sources/vendors/common.py @@ -40,7 +40,7 @@ class BaseSource(object): def __init__(self, stock_symbol, *args, **kwargs): self.stock_symbol = stock_symbol if self._support_data_json: - self._data_json = self.fetch_data_json(self.stock_symbol) + self._data_json = None def __getattr__(self, name): if (not self._support_data_json and diff --git a/cool_finance/server.py b/cool_finance/server.py index cd85d35..8eb6bc5 100644 --- a/cool_finance/server.py +++ b/cool_finance/server.py @@ -7,6 +7,11 @@ import traceback import sys +try: + from urllib.error import HTTPError +except ImportError: # python 2 + from urllib2 import HTTPError + from pytz import timezone import requests @@ -57,7 +62,7 @@ def __init__(self, db_client, datasource_mgr, stock_setting, stop_event): def _get_stock_data(self, stock_symbol): data = self.info_obj - data.refresh() + data.fetch() data_set = data.get_data_json() self.db_client.insert_one(data_set, stock_symbol) return self.info_obj @@ -79,7 +84,12 @@ def _send_nofitication(self, msg): def run(self): while not (self.stopped.wait(self.delay) or self.stopped.is_set()): - data = self._get_stock_data(self.stock_symbol) + try: + data = self._get_stock_data(self.stock_symbol) + except HTTPError as err: + logger.warning("Fetch stock info %s failed. Retry later. " + "Reason: %s", self.stock_symbol, err) + continue price = data.get_price() logger.debug("Got %s price: $%s", self.stock_symbol, str(price)) for target in self.targets_list: diff --git a/cool_finance/tests/__init__.py b/cool_finance/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cool_finance/tests/test_server.py b/cool_finance/tests/test_server.py new file mode 100644 index 0000000..72f220d --- /dev/null +++ b/cool_finance/tests/test_server.py @@ -0,0 +1,56 @@ +import mock +from mock import Mock +from mock import patch +import unittest +try: + from urllib.error import HTTPError +except ImportError: # python 2 + from urllib2 import HTTPError + +from cool_finance import server + +sample_stock_setting = { + "name": "QCOM", + "targets_list": [20, 10, 30], + "interval_s": 10 +} + + +class TestWorker(unittest.TestCase): + + def setUp(self): + db_client = Mock() + datasource_mgr = Mock() + stock_setting = sample_stock_setting + stop_event = Mock(return_value=False) + stop_event.wait = Mock(return_value=False) + stop_event.is_set = Mock(return_value=False) + self.patcher = patch("threading.Thread") + self.worker = server.Worker(db_client, datasource_mgr, + stock_setting, stop_event) + + def tearDown(self): + self.worker = None + + def test_run_when_data_fetch_fail(self): + failed_get_stock_data = Mock( + side_effect=HTTPError("", 500, "msg", "hdrs", None)) + server.Worker._get_stock_data = failed_get_stock_data + with patch("logging.Logger.warning") as mock_logging: + mock_logging.side_effect = HTTPError("", 500, "msg", "hdrs", None) + with self.assertRaises(HTTPError): + self.worker.run() + failed_get_stock_data.assert_called() + + def _set_stop_event(self): + self.worker.stopped.is_set = Mock(return_value=True) + return mock.DEFAULT + + def test_run_when_data_fetch_succeed(self): + data = Mock() + data.get_price = Mock(return_value="12345", + side_effect=self._set_stop_event) + successful_get_stock_data = Mock(return_value=data) + server.Worker._get_stock_data = successful_get_stock_data + self.worker.run() + successful_get_stock_data.assert_called() From 278d041307ed0247e4d6089366d0f0df4282cc93 Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Fri, 10 Feb 2017 18:34:44 -0800 Subject: [PATCH 07/11] add test-requirements.txt --- test-requirements.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 test-requirements.txt diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 0000000..23ee03b --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1 @@ +mock \ No newline at end of file From 13737822d155ebf850b91160243bc048daa5a704 Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Mon, 13 Feb 2017 11:21:55 -0800 Subject: [PATCH 08/11] add continuous run --- cool_finance/server.py | 56 +++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/cool_finance/server.py b/cool_finance/server.py index 8eb6bc5..bff25e9 100644 --- a/cool_finance/server.py +++ b/cool_finance/server.py @@ -102,16 +102,19 @@ def run(self): class Server(object): def __init__(self, config_file=constants.CONFIG_FILE): - self._reload_config(config_file) + self._config_file = config_file + self.reload_config(self._config_file) self._db_client = db.Client(constants.DB_HOST, constants.DB_PORT) self._datasource_mgr = DataSourceManager() self._worker_stopflag_list = [] - def _reload_config(self, config_file): + def reload_config(self, config_file=None): + if not config_file: + config_file = self._config_file with open(config_file) as stocks_config: stocks_json = json.load(stocks_config) - # {"stocks": [{"name":, "less_than","greater_than", "interval_s"}]} + # {"stocks": [{"name":"", "targets_list":[int], "interval_s": int}]} self.stocks_list = stocks_json['stocks'] def start(self): @@ -128,9 +131,16 @@ def end(self): for worker, stopflag in self._worker_stopflag_list: stopflag.set() + def restart(self): + self.end() + self.wait_all_workers_done() + self.reload_config() + self.start() + def wait_all_workers_done(self): - for worker, stopflag in self._worker_stopflag_list: + for index, (worker, stopflag) in enumerate(self._worker_stopflag_list): worker.join() + self._worker_stopflag_list.remove(index) def get_start_and_end_datetime(start_hour_min_sec=constants.START_HOUR_MIN_SEC, @@ -157,25 +167,27 @@ def main(): logger.info("Welcome to Cool Finance by F.JHL.") server = Server() try: - start_datetime, end_datetime, tz = get_start_and_end_datetime() - if not constants.START_NOW: - logger.info("The server will start at %s.", start_datetime) - logger.info("The server will end at %s.", end_datetime) - delta = start_datetime - datetime.datetime.now(tz) - logger.info("The server will start %s later.", delta) - while datetime.datetime.now(tz) < start_datetime: + while True: + server.reload_config() + start_datetime, end_datetime, tz = get_start_and_end_datetime() + if not constants.START_NOW: + logger.info("The server will start at %s.", start_datetime) + logger.info("The server will end at %s.", end_datetime) + delta = start_datetime - datetime.datetime.now(tz) + logger.info("The server will start %s later.", delta) + while datetime.datetime.now(tz) < start_datetime: + time.sleep(1) + logger.info("The server is going to start.") + server.start() + logger.info("The server is already started.") + + while datetime.datetime.now(tz) < end_datetime: time.sleep(1) - logger.info("The server is going to start.") - server.start() - logger.info("The server is already started.") - - while datetime.datetime.now(tz) < end_datetime: - time.sleep(1) - logger.info("The server is going to end.") - server.end() - logger.info("The server is already end.") - except KeyboardInterrupt: - logger.info("KeyboardInterrupt: The server is going to end.") + logger.info("The server is going to end.") + server.end() + logger.info("The server is already end.") + except (KeyboardInterrupt, Exception) as err: + logger.info("Exception %s: The server is going to end.", err) server.end() logger.info("The server is already end.") finally: From fb5583981a81ab986675b5e9a6c465284ec19958 Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Fri, 17 Feb 2017 02:55:35 -0800 Subject: [PATCH 09/11] improve logging and fix mirror bugs --- cool_finance/log.py | 39 +++++++++++++++++++++++++++ cool_finance/server.py | 60 +++++++++++++----------------------------- 2 files changed, 58 insertions(+), 41 deletions(-) create mode 100644 cool_finance/log.py diff --git a/cool_finance/log.py b/cool_finance/log.py new file mode 100644 index 0000000..23dd5dd --- /dev/null +++ b/cool_finance/log.py @@ -0,0 +1,39 @@ +import datetime +import logging +import traceback +import os +import sys + +from cool_finance import constants + +# create logger +logger = logging.getLogger(__name__) +if constants.DEBUG_LOG_DIR: + logger.setLevel(constants.DEBUG_LOG_LEVEL) +else: + logger.setLevel(constants.LOG_LEVEL) +# create console handler with a higher log level +ch = logging.StreamHandler() +ch.setLevel(constants.LOG_LEVEL) +# create formatter and add it to the handlers +formatter = logging.Formatter(constants.LOG_FORMAT) +ch.setFormatter(formatter) +# add the handlers to the logger +logger.addHandler(ch) + +# create file handler which logs even debug messages +if constants.DEBUG_LOG_DIR: + filename = "debug-" + \ + datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".log" + fh = logging.FileHandler(os.path.join(constants.DEBUG_LOG_DIR, filename)) + fh.setLevel(constants.DEBUG_LOG_LEVEL) + fh.setFormatter(formatter) + logger.addHandler(fh) + + +def log_uncaught_exceptions(ex_cls, ex, tb): + logger.error(''.join(traceback.format_tb(tb))) + logger.error('{0}: {1}'.format(ex_cls, ex)) + +sys.excepthook = log_uncaught_exceptions + diff --git a/cool_finance/server.py b/cool_finance/server.py index bff25e9..e3cceef 100644 --- a/cool_finance/server.py +++ b/cool_finance/server.py @@ -1,52 +1,21 @@ import datetime import json -import logging from threading import Event from threading import Thread import time -import traceback -import sys - -try: - from urllib.error import HTTPError -except ImportError: # python 2 - from urllib2 import HTTPError from pytz import timezone import requests -from cool_finance import constants -from cool_finance import db -from cool_finance.data_sources.manager import DataSourceManager - -# create logger -logger = logging.getLogger(__name__) -if constants.DEBUG_LOG_FILE: - logger.setLevel(constants.DEBUG_LOG_LEVEL) -else: - logger.setLevel(constants.LOG_LEVEL) -# create console handler with a higher log level -ch = logging.StreamHandler() -ch.setLevel(constants.LOG_LEVEL) -# create formatter and add it to the handlers -formatter = logging.Formatter(constants.LOG_FORMAT) -ch.setFormatter(formatter) -# add the handlers to the logger -logger.addHandler(ch) - -# create file handler which logs even debug messages -if constants.DEBUG_LOG_FILE: - fh = logging.FileHandler(constants.DEBUG_LOG_FILE) - fh.setLevel(constants.DEBUG_LOG_LEVEL) - fh.setFormatter(formatter) - logger.addHandler(fh) +from . import constants +from . import db +from .data_sources.manager import DataSourceManager +from .log import logger +# server will be initialized in main() +server = None -def log_uncaught_exceptions(ex_cls, ex, tb): - logger.error(''.join(traceback.format_tb(tb))) - logger.error('{0}: {1}'.format(ex_cls, ex)) -sys.excepthook = log_uncaught_exceptions class Worker(Thread): @@ -59,6 +28,7 @@ def __init__(self, db_client, datasource_mgr, stock_setting, stop_event): self.db_client = db_client self.stopped = stop_event self.info_obj = datasource_mgr.get_vendor()(self.stock_symbol) + self._last_notification_datetime = None def _get_stock_data(self, stock_symbol): data = self.info_obj @@ -86,9 +56,10 @@ def run(self): while not (self.stopped.wait(self.delay) or self.stopped.is_set()): try: data = self._get_stock_data(self.stock_symbol) - except HTTPError as err: + except Exception as err: logger.warning("Fetch stock info %s failed. Retry later. " - "Reason: %s", self.stock_symbol, err) + "Reason: %s", self.stock_symbol, err, + exc_info=True) continue price = data.get_price() logger.debug("Got %s price: $%s", self.stock_symbol, str(price)) @@ -96,7 +67,13 @@ def run(self): if abs(float(price) - target) <= 0.01: notice_msg = self._get_notice_msg( self.stock_setting, price) - self._send_nofitication(notice_msg) + now = datetime.datetime.now() + if (not self._last_notification_datetime or + (now - self._last_notification_datetime). + total_seconds() >= + constants.NOTIFICATION_INTERVAL_S): + self._send_nofitication(notice_msg) + self._last_notification_datetime = now class Server(object): @@ -140,7 +117,7 @@ def restart(self): def wait_all_workers_done(self): for index, (worker, stopflag) in enumerate(self._worker_stopflag_list): worker.join() - self._worker_stopflag_list.remove(index) + self._worker_stopflag_list.pop(index) def get_start_and_end_datetime(start_hour_min_sec=constants.START_HOUR_MIN_SEC, @@ -165,6 +142,7 @@ def get_start_and_end_datetime(start_hour_min_sec=constants.START_HOUR_MIN_SEC, def main(): logger.info("Welcome to Cool Finance by F.JHL.") + global server server = Server() try: while True: From 0317ef4988ad20fa508d060f2bcf95a77707d1f7 Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Fri, 17 Feb 2017 03:00:06 -0800 Subject: [PATCH 10/11] add batch query google finance data source - explicitly set stock tickers to UPPERCASE ASCII str --- cool_finance/constants.py | 14 +++- cool_finance/data_sources/constants.py | 3 +- cool_finance/data_sources/manager.py | 12 ++- cool_finance/data_sources/vendors/common.py | 1 + .../data_sources/vendors/google_finance.py | 3 +- .../vendors/google_finance_batch.py | 83 +++++++++++++++++++ 6 files changed, 108 insertions(+), 8 deletions(-) create mode 100644 cool_finance/data_sources/vendors/google_finance_batch.py diff --git a/cool_finance/constants.py b/cool_finance/constants.py index 1edd7ab..9310c13 100644 --- a/cool_finance/constants.py +++ b/cool_finance/constants.py @@ -9,9 +9,9 @@ LOG_FORMAT = '%(asctime)s - %(name)s - %(threadName)s - ' \ '%(levelname)s - %(message)s' # DEBUG_LOG_LEVEL must stricter than LOG_LEVEL -# if DEBUG_LOG_FILE is not None, DEBUG_LOG_LEVEL must be set -# if if DEBUG_LOG_FILE is None, DEBUG_LOG_LEVEL would be ignored -DEBUG_LOG_FILE = None +# if DEBUG_LOG_DIR is not None, DEBUG_LOG_LEVEL must be set +# if if DEBUG_LOG_DIR is None, DEBUG_LOG_LEVEL would be ignored +DEBUG_LOG_DIR = None DEBUG_LOG_LEVEL = "DEBUG" # set START_NOW will override the START_HOUR_MIN_SEC @@ -22,3 +22,11 @@ TIMEZONE = 'US/Eastern' # Sat, Sun are closed market day CLOSED_WEEKDAYS = [5, 6] +# Only one notification will be generated +# every NOTIFICATION_INTERVAL_S seconds for same stock. +NOTIFICATION_INTERVAL_S = 300 +# Guarantee query result is within QUORY_PRECISION_S seconds up to date. +# A significant large number (10x, 100x) can reduce query amount to +# date source server. Tweak it if you have a daily query limit. +# Google data source doesn't seem to have a limit. +QUORY_PRECISION_S = 0.1 diff --git a/cool_finance/data_sources/constants.py b/cool_finance/data_sources/constants.py index a75c0b5..b42db08 100644 --- a/cool_finance/data_sources/constants.py +++ b/cool_finance/data_sources/constants.py @@ -1,8 +1,9 @@ # datasource vendor names BASE_VENDOR = "BASE" GOOGLE_FINANCE_VENDOR = "google_finance" +GOOGLE_FINANCE_BATCH_VENDOR = "google_finance_batch" -DEFAULT_DATASOURCE = GOOGLE_FINANCE_VENDOR +DEFAULT_DATASOURCE = GOOGLE_FINANCE_BATCH_VENDOR INDEX = "index" # NASDAQ, NYSE... LAST_TRADE_PRICE = "last_trade_price" diff --git a/cool_finance/data_sources/manager.py b/cool_finance/data_sources/manager.py index 200d103..9486a11 100644 --- a/cool_finance/data_sources/manager.py +++ b/cool_finance/data_sources/manager.py @@ -1,12 +1,20 @@ from cool_finance.data_sources import constants as const -from cool_finance.data_sources.vendors import (common, google_finance) +from cool_finance.data_sources.vendors import (common, google_finance, + google_finance_batch) class DataSourceManager(object): _supported_vendors = { const.BASE_VENDOR: common.BaseSource, - const.GOOGLE_FINANCE_VENDOR: google_finance.GoogleFinance + const.GOOGLE_FINANCE_VENDOR: google_finance.GoogleFinance, + const.GOOGLE_FINANCE_BATCH_VENDOR: + google_finance_batch.GoogleFinanceBatch + } + + _support_batch_query_vendor = { + const.GOOGLE_FINANCE_BATCH_VENDOR: + google_finance_batch.GoogleFinanceBatch } def __init__(self, default_vendor=const.DEFAULT_DATASOURCE): diff --git a/cool_finance/data_sources/vendors/common.py b/cool_finance/data_sources/vendors/common.py index ac39b5f..b01a9f7 100644 --- a/cool_finance/data_sources/vendors/common.py +++ b/cool_finance/data_sources/vendors/common.py @@ -38,6 +38,7 @@ class BaseSource(object): ] def __init__(self, stock_symbol, *args, **kwargs): + stock_symbol = stock_symbol.encode('utf-8').upper() self.stock_symbol = stock_symbol if self._support_data_json: self._data_json = None diff --git a/cool_finance/data_sources/vendors/google_finance.py b/cool_finance/data_sources/vendors/google_finance.py index 005cf2a..65f0e25 100644 --- a/cool_finance/data_sources/vendors/google_finance.py +++ b/cool_finance/data_sources/vendors/google_finance.py @@ -28,5 +28,4 @@ class GoogleFinance(BaseSource): def _fetch(self, *args, **kwargs): # googlefinance has a bug if the getQuote() input is unicode # change the type to str explicitly - stock_symbol = self.stock_symbol.encode('utf-8') - return getQuotes(stock_symbol)[0] + return getQuotes(self.stock_symbol)[0] diff --git a/cool_finance/data_sources/vendors/google_finance_batch.py b/cool_finance/data_sources/vendors/google_finance_batch.py new file mode 100644 index 0000000..641ac12 --- /dev/null +++ b/cool_finance/data_sources/vendors/google_finance_batch.py @@ -0,0 +1,83 @@ +import datetime +from threading import Lock + +from googlefinance import getQuotes + +from cool_finance.constants import QUORY_PRECISION_S +from cool_finance.data_sources import constants as const +from cool_finance.data_sources.vendors.google_finance import GoogleFinance +from cool_finance.log import logger + +LAST_UPDATE = "last_update" +STOCKS_DATA = "stocks_data" + + +class GoogleFinanceBatchHandler(object): + + _symbol_key = GoogleFinance._data_json_keys[const.STOCK_SYMBOL] + + def __init__(self): + self._stocks_list = [] + # self._stocks_data = {"last_update":datetime, + # "stocks_data": { + # "ABC":{...}}, + # "BCD":{...}} } } + self._stocks_data = {LAST_UPDATE: None, + STOCKS_DATA: {}} + self._data_access_lock = Lock() + + def add_stock(self, stock_symbol): + # googlefinance has a bug if the getQuote() input is unicode + # change the type to str explicitly + self._stocks_list.append(stock_symbol) + + def fetch(self, stock_symbol): + # Multiple works could call fetch() as well as _should_update() + # and _fetch_batch(). Must use the _data_access_lock to guard them. + with self._data_access_lock: + if self._should_update(): + self._stocks_data = self._fetch_batch(self._stocks_list) + logger.debug("Look for %s, new request sent to Google", + stock_symbol) + return dict(self._stocks_data[STOCKS_DATA][stock_symbol]) + + def _should_update(self): + if self._stocks_data[LAST_UPDATE]: + now = datetime.datetime.now() + last_update = self._stocks_data[LAST_UPDATE] + delta = now - last_update + if delta.total_seconds() <= QUORY_PRECISION_S: + return False + return True + + def _fetch_batch(self, stocks_list): + if not stocks_list: + stocks_list = self._stocks_list + quotes_list = getQuotes(stocks_list) + now = datetime.datetime.now() + self._stocks_data[LAST_UPDATE] = now + self._stocks_data[STOCKS_DATA].clear() + for item in quotes_list: + stock_symbol = item[self._symbol_key] + self._stocks_data[STOCKS_DATA][stock_symbol] = item + return self._stocks_data + + +batch_handler = GoogleFinanceBatchHandler() + + +class GoogleFinanceBatch(GoogleFinance): + + vendor_name = const.GOOGLE_FINANCE_BATCH_VENDOR + + def __init__(self, stock_symbol): + super(GoogleFinanceBatch, self).__init__(stock_symbol) + global batch_handler + self.batch_handler = batch_handler + # googlefinance has a bug if the getQuote() input is unicode + # change the type to str explicitly + stock_symbol = stock_symbol.encode('utf-8') + self.batch_handler.add_stock(stock_symbol) + + def _fetch(self, *args, **kwargs): + return self.batch_handler.fetch(self.stock_symbol) From 1c9d02092267e9441880e7c00e69dc5c71cb3e00 Mon Sep 17 00:00:00 2001 From: Jiahao Liang Date: Fri, 17 Feb 2017 03:13:43 -0800 Subject: [PATCH 11/11] fix typo --- cool_finance/constants.py | 4 ++-- cool_finance/data_sources/vendors/google_finance_batch.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cool_finance/constants.py b/cool_finance/constants.py index 9310c13..c263e3f 100644 --- a/cool_finance/constants.py +++ b/cool_finance/constants.py @@ -25,8 +25,8 @@ # Only one notification will be generated # every NOTIFICATION_INTERVAL_S seconds for same stock. NOTIFICATION_INTERVAL_S = 300 -# Guarantee query result is within QUORY_PRECISION_S seconds up to date. +# Guarantee query result is within QUERY_PRECISION_S seconds up to date. # A significant large number (10x, 100x) can reduce query amount to # date source server. Tweak it if you have a daily query limit. # Google data source doesn't seem to have a limit. -QUORY_PRECISION_S = 0.1 +QUERY_PRECISION_S = 0.1 diff --git a/cool_finance/data_sources/vendors/google_finance_batch.py b/cool_finance/data_sources/vendors/google_finance_batch.py index 641ac12..1fc2cb8 100644 --- a/cool_finance/data_sources/vendors/google_finance_batch.py +++ b/cool_finance/data_sources/vendors/google_finance_batch.py @@ -3,7 +3,7 @@ from googlefinance import getQuotes -from cool_finance.constants import QUORY_PRECISION_S +from cool_finance.constants import QUERY_PRECISION_S from cool_finance.data_sources import constants as const from cool_finance.data_sources.vendors.google_finance import GoogleFinance from cool_finance.log import logger @@ -46,7 +46,7 @@ def _should_update(self): now = datetime.datetime.now() last_update = self._stocks_data[LAST_UPDATE] delta = now - last_update - if delta.total_seconds() <= QUORY_PRECISION_S: + if delta.total_seconds() <= QUERY_PRECISION_S: return False return True