-
Notifications
You must be signed in to change notification settings - Fork 10
/
main.py
192 lines (148 loc) · 5.89 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import base64
import os
import openai
import requests
from urllib.parse import urlparse
from deta import Base, Drive
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from jinja2 import Template
from pydantic import BaseModel
class New_ID(BaseModel):
new_id: int
app = FastAPI()
app.mount("/public", StaticFiles(directory="public"), name="public")
PHOTOS = Drive("generations")
CONFIG = Base("config")
BOT_KEY = os.getenv("TELEGRAM")
OPEN_AI_KEY = os.getenv("OPEN_AI")
BOT_URL = f"https://api.telegram.org/bot{BOT_KEY}"
OPEN_AI_URL = "https://api.openai.com/v1/images/generations"
BLACKHOLE_URL = os.getenv("BLACKHOLE")
def is_valid_url(url):
parsed_url = urlparse(url)
return bool(parsed_url.scheme and parsed_url.netloc)
env_error = (
not BOT_KEY
or BOT_KEY == "enter your key"
or not OPEN_AI_KEY
or OPEN_AI_KEY == "enter your key"
)
bh_validity = is_valid_url(BLACKHOLE_URL)
openai.api_key = OPEN_AI_KEY
def get_image_from_prompt(prompt):
try:
response = openai.Image.create(
prompt=prompt,
n=1,
size="512x512",
response_format="b64_json",
)
if "error" not in response:
return {
"b64img": response["data"][0]["b64_json"], # type: ignore
"created": response["created"], # type: ignore
}
return {"error": response["error"]["message"]} # type: ignore
except Exception as e:
return {"error": str(e)} # type: ignore
def save_and_send_img(b64img, chat_id, prompt, timestamp):
image_data = base64.b64decode(b64img)
photo_payload = {"photo": image_data}
message_url = f"{BOT_URL}/sendPhoto?chat_id={chat_id}&caption={prompt}"
requests.post(message_url, files=photo_payload).json()
if bh_validity:
requests.post(BLACKHOLE_URL, files=photo_payload).json()
else:
filename = f"{timestamp} - {prompt}.png"
success = PHOTOS.put(filename, image_data)
return {"chat_id": chat_id, "caption": prompt}
def send_error(chat_id, error_message):
message_url = f"{BOT_URL}/sendMessage"
payload = {"text": error_message, "chat_id": chat_id}
return requests.post(message_url, json=payload).json()
def get_webhook_info():
message_url = f"{BOT_URL}/getWebhookInfo"
return requests.get(message_url).json()
@app.get("/")
def home():
home_template = Template((open("index.html").read()))
response = get_webhook_info()
if (env_error):
return RedirectResponse("/setup")
if response and "result" in response and not response["result"]["url"]:
return RedirectResponse("/setup")
if response and "result" in response and "url" in response["result"]:
return HTMLResponse(home_template.render(status="READY"))
return HTMLResponse(home_template.render(status="ERROR"))
@app.get("/setup")
def setup():
home_template = Template((open("index.html").read()))
blackhole_app_url = f"https://{urlparse(BLACKHOLE_URL).hostname}" if bh_validity else ""
if (env_error):
return HTMLResponse(home_template.render(status="SETUP_ENVS"))
return HTMLResponse(home_template.render(status="SETUP_WEBHOOK", blackhole_url=blackhole_app_url))
@app.get("/authorize")
def auth():
authorized_chat_ids = CONFIG.get("chat_ids")
home_template = Template((open("index.html").read()))
if authorized_chat_ids is None:
return HTMLResponse(home_template.render(status="AUTH", chat_ids=None))
return HTMLResponse(
home_template.render(status="AUTH", chat_ids=authorized_chat_ids.get("value")) # type: ignore
)
@app.post("/authorize")
def add_auth(item: New_ID):
if CONFIG.get("chat_ids") is None:
CONFIG.put(data=[item.new_id], key="chat_ids")
return
CONFIG.update(updates={"value": CONFIG.util.append(item.new_id)}, key="chat_ids")
return
@app.post("/open")
async def http_handler(request: Request):
incoming_data = await request.json()
if "message" not in incoming_data:
print(incoming_data)
return send_error(None, "Unknown error, lol, handling coming soon")
prompt = incoming_data["message"]["text"]
chat_id = incoming_data["message"]["chat"]["id"]
authorized_chat_ids = CONFIG.get("chat_ids")
if prompt in ["/chat-id", "/chatid"]:
payload = {
"text": f"```{chat_id}```",
"chat_id": chat_id,
"parse_mode": "MarkdownV2",
}
message_url = f"{BOT_URL}/sendMessage"
requests.post(message_url, json=payload).json()
return
if prompt in ["/start", "/help"]:
response_text = (
"welcome to telemage. to generate an image with ai,"
" send me a prompt or phrase and i'll do some magic."
)
payload = {"text": response_text, "chat_id": chat_id}
message_url = f"{BOT_URL}/sendMessage"
requests.post(message_url, json=payload).json()
return
if authorized_chat_ids is None or chat_id not in authorized_chat_ids.get("value"): # type: ignore
payload = {"text": "you're not authorized. contact this bot's admin to authorize.", "chat_id": chat_id}
message_url = f"{BOT_URL}/sendMessage"
requests.post(message_url, json=payload).json()
return
open_ai_resp = get_image_from_prompt(prompt)
if "b64img" in open_ai_resp:
return save_and_send_img(
open_ai_resp["b64img"], chat_id, prompt, open_ai_resp["created"]
)
print(prompt)
if "error" in open_ai_resp:
return send_error(chat_id, open_ai_resp["error"])
return send_error(chat_id, "Unknown error, lol, handling coming soon")
@app.get("/set_webhook")
def url_setter():
PROG_URL = os.getenv("DETA_SPACE_APP_HOSTNAME")
set_url = f"{BOT_URL}/setWebHook?url=https://{PROG_URL}/open"
resp = requests.get(set_url)
return resp.json()