-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathrun.py
129 lines (106 loc) · 4.06 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""Sample class to testrun"""
import json
import asyncio
from pathlib import Path
from pylamarzocco.devices.base import LaMarzoccoBaseDevice
from pylamarzocco.devices.machine import LaMarzoccoMachine
from pylamarzocco.const import MachineModel
from pylamarzocco.clients.bluetooth import LaMarzoccoBluetoothClient
from pylamarzocco.clients.cloud import LaMarzoccoCloudClient
from pylamarzocco.clients.local import LaMarzoccoLocalClient
async def main():
"""Main function."""
with open(f"{Path(__file__).parent}/secrets.json", encoding="utf-8") as f:
data = json.load(f)
# local_client = LaMarzoccoLocalClient(
# host=data["host"],
# local_bearer=data["token"],
# )
# def print_msg(msg):
# print(msg)
# await local_client.websocket_connect(print_msg)
# await asyncio.sleep(60)
cloud_client = LaMarzoccoCloudClient(
username=data["username"],
password=data["password"],
)
fleet = await cloud_client.get_customer_fleet()
# serial = list(fleet.keys())[0]
# local_client = LaMarzoccoLocalClient(
# host=data["host"],
# local_bearer=data["token"],
# )
# bluetooth_devices = await LaMarzoccoBluetoothClient.discover_devices()
# bluetooth_client = None
# if bluetooth_devices:
# print("Found bluetooth device:", bluetooth_devices[0])
# bluetooth_client = LaMarzoccoBluetoothClient(
# data["username"],
# data["serial"],
# data["token"],
# bluetooth_devices[0],
# )
# machine = await LaMarzoccoMachine.create(
# model=LaMarzoccoMachineModel(fleet[serial].model_name),
# serial_number=fleet[serial].serial_number,
# name=fleet[serial].name,
# # cloud_client=cloud_client,
# local_client=local_client,
# )
machine = await LaMarzoccoMachine.create(
model=MachineModel(data["model"]),
serial_number=data["serial"],
name=data["serial"],
cloud_client=cloud_client,
# local_client=local_client,
# bluetooth_client=bluetooth_client,
)
await machine.get_config()
# await machine.websocket_connect()
# await asyncio.sleep(300)
# lmcloud = await LMCloud.create_with_local_api(creds, data["host"], data["port"])
# await lmcloud.set_power("standby")
# lmcloud.local_get_config()
# await lmcloud.set_steam(True)
# await lmcloud.set_coffee_temp(93.5)
# await lmcloud.set_steam_temp(131)
# print(await lmcloud.get_coffee_boiler_enabled())
# await lmcloud.get_status()
# config = await lmcloud.get_config()
# await lmcloud.set_steam(False)
# await lmcloud.set_auto_on_off("Monday", 13, 15, 16, 15)
# await lmcloud.set_auto_on_off_enable("Monday", False)
# current_status = await lmcloud._get_status()
# await lmcloud.set_steam(True)
# current_status = lmcloud.current_status
# await lmcloud.set_prebrew(False)
# config = await lmcloud.get_config()
# await asyncio.sleep(10)
# await lmcloud.update_local_machine_status()
# print("Entering loop...")
# while True:
# # print(lmcloud._lm_local_api._timestamp_last_websocket_msg)
# if lmcloud.current_status["active_brew"]:
# print("Brewing")
# await asyncio.sleep(1)
# await machine.set_power(True)
# await asyncio.sleep(5)
# await machine.set_power(False)
# while True:
# print("waiting...")
# # print(lmcloud._lm_local_api._timestamp_last_websocket_msg)
# # if lmcloud.current_status["brew_active"]:
# # print("Brewing")#
# # await lmcloud.update_local_machine_status()
# await asyncio.sleep(5)
# print(lmcloud._lm_bluetooth._address)
# await lmcloud.set_power(True)
# await lmcloud.set_steam(False)
# await lmcloud.set_coffee_temp(93.5)
# await lmcloud.set_steam_temp(128)
# await lmcloud.set_power(False)
# await lmcloud.set_auto_on_off("thu", 14, 15, 16, 15)
# await lmcloud.set_auto_on_off_enable("thu", False)
print(str(machine))
print("Done.")
asyncio.run(main())