python-gotosocial-notif/main.py

178 lines
5.2 KiB
Python

# %%
# ZED Python REPL
import asyncio
import os
import re
from datetime import datetime
import requests
import telegram
from dotenv import load_dotenv
from sqlalchemy import desc, update
from sqlalchemy.exc import IntegrityError
from models import Notifikasi, Session
# Request ke kauaku
load_dotenv()
GTS_API = os.getenv("GTS_API")
GTS_TOKEN = os.getenv("GTS_TOKEN")
TELE_BOT = os.getenv("TELE_BOT")
TELE_CHATID = os.getenv("TELE_CHATID")
headers = {"Authorization": f"Bearer {GTS_TOKEN}"}
session = Session()
bot = telegram.Bot(token=f"{TELE_BOT}")
async def getNotif():
req = requests.get(f"{GTS_API}?limit=5", headers=headers)
if req.status_code == 200:
data = req.json()
try:
for notif in data:
type = notif["type"]
inreply = None
status_content = None
url = None
if "status" in notif:
inreply = notif["status"]["in_reply_to_id"]
status_content = notif["status"]["content"]
url = notif["status"]["url"]
postid = notif["id"]
created = datetime.strptime(
notif["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ"
)
handler = notif["account"]["acct"]
username = notif["account"]["display_name"]
kirim = "Belum"
if type == "follow":
insert_follow = Notifikasi(
post_id=postid,
created_at=created,
handler=handler,
display_name=username,
type=type,
remark=kirim,
)
session.add(insert_follow)
else:
insert_mentions = Notifikasi(
inreplyto=inreply,
post_id=postid,
created_at=created,
handler=handler,
display_name=username,
type=type,
status=status_content,
url=url,
remark=kirim,
)
session.add(insert_mentions)
session.commit()
except IntegrityError as err:
session.rollback()
if "post_id" in str(err.orig):
print("Post Id sudah ada")
else:
print("Integritas data bermasalah", str(err))
finally:
session.close()
async def kirim_ke_tele():
cek_data = (
session.query(Notifikasi)
.filter(Notifikasi.remark.like("%elu%"))
.order_by(desc(Notifikasi.id))
.limit(10)
.all()
)
for notif in cek_data:
def icon_(type_):
if type_ == "mention":
return "💬 mention your note"
elif type_ == "reblog":
return "🚀 boost your note"
elif type_ == "favourite":
return "💖 falling in love with your note"
elif type_ == "follow":
return "✋ started to follow you"
else:
return None
flag = icon_(notif.type)
if str(notif.type) == "follow":
try:
await bot.send_message(
parse_mode="html",
chat_id=str(TELE_CHATID),
text=f"""
<b>{notif.display_name}</b>\n<i>{notif.handler}</i>\n{flag}
""",
)
except Exception as e:
print("Follow: Ada masalah saat kirim ke Telebot", e)
else:
# html_strip = re.compile("<.*?>")
# status = str(notif.status)
# status_text = re.sub(html_strip, "", status)
raw = str(notif.status)
# raw = raw.strip()
raw = raw.replace("<p>", "", 1)
raw = raw.rsplit("</p>", 1)[0]
rubah_p = re.sub(r"</p>", "\n\n", raw)
rubah_br = re.sub(r"<br>", "\n", rubah_p)
status_text = re.sub(r"<.*?>", "", rubah_br)
try:
await bot.send_message(
parse_mode="html",
chat_id=str(TELE_CHATID),
text=f"""
<b>{notif.display_name}</b>\n<i>{notif.handler}</i>\n{flag}\n\n{status_text}\n\n<a href="{notif.url}">source</a>
""",
)
except Exception as e:
print("Mentions: Ada masalah saat kirim ke Telebot", e)
session.close()
async def tandai_notif():
try:
session.execute(
update(Notifikasi)
.where(Notifikasi.remark == "Belum")
.values(remark="Sudah")
)
session.commit()
except Exception as e:
print("Ada masalah saat update remark", e)
finally:
session.close()
async def jalan():
await asyncio.sleep(2)
await getNotif()
await asyncio.sleep(5)
await kirim_ke_tele()
await asyncio.sleep(5)
await tandai_notif()
#
if __name__ == "__main__":
asyncio.run(jalan())