-
Notifications
You must be signed in to change notification settings - Fork 0
/
daily_mapping_check.py
36 lines (30 loc) · 1.35 KB
/
daily_mapping_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from mes_module import SourceDataProcess
from datetime import datetime, timedelta
from query_data_module import ConnectToMongo
import pandas as pd
class DailyCheck():
def __init__(self):
super().__init__()
self.tbname = 'wip_lot'
self.fab = 'S3'
self.start_time = datetime.now() - timedelta(days=5)
self.end_time = datetime.now()
self.wip_df = self.get_wip()
self.time_col = ["MOVE_IN_TIME", "MOVE_OUT_TIME"]
def get_wip(self):
colle_wip = self.db[self.tbname]
match = {"$match": {"FAB_ID": self.fab,
"MOVE_OUT_TIME": {"$gte": self.start_time, "$lte": self.end_time}}}
project = {"$project": {"_id": 0}}
pipeline = [match, project]
wip_df = pd.DataFrame(list(colle_wip.aggregate(pipeline)))
return wip_df
def mapping_check(self):
if not self.wip_df.empty:
self.wip_df["MOVE_IN_TIME"] = self.wip_df["MOVE_IN_TIME"].dt.tz_localize("UTC")
self.wip_df["MOVE_OUT_TIME"] = self.wip_df["MOVE_OUT_TIME"].dt.tz_localize("UTC")
source_data_process = SourceDataProcess(self.wip_df, self.db, self.mongo_remove, self.mongo_import,
self.bulk_write)
source_data_process.main_funtion()
daily_check = DailyCheck()
daily_check.mapping_check()