# %% # ZED Python REPL import asyncio import os import re from datetime import datetime import requests import telegram from dotenv import load_dotenv from sqlalchemy import desc, update from sqlalchemy.exc import IntegrityError from models import Notifikasi, Session # Request ke kauaku load_dotenv() GTS_API = os.getenv("GTS_API") GTS_TOKEN = os.getenv("GTS_TOKEN") TELE_BOT = os.getenv("TELE_BOT") TELE_CHATID = os.getenv("TELE_CHATID") headers = {"Authorization": f"Bearer {GTS_TOKEN}"} session = Session() bot = telegram.Bot(token=f"{TELE_BOT}") async def getNotif(): req = requests.get(f"{GTS_API}?limit=5", headers=headers) if req.status_code == 200: data = req.json() try: for notif in data: type = notif["type"] inreply = None status_content = None url = None if "status" in notif: inreply = notif["status"]["in_reply_to_id"] status_content = notif["status"]["content"] url = notif["status"]["url"] postid = notif["id"] created = datetime.strptime( notif["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ" ) handler = notif["account"]["acct"] username = notif["account"]["display_name"] kirim = "Belum" if type == "follow": insert_follow = Notifikasi( post_id=postid, created_at=created, handler=handler, display_name=username, type=type, remark=kirim, ) session.add(insert_follow) else: insert_mentions = Notifikasi( inreplyto=inreply, post_id=postid, created_at=created, handler=handler, display_name=username, type=type, status=status_content, url=url, remark=kirim, ) session.add(insert_mentions) session.commit() except IntegrityError as err: session.rollback() if "post_id" in str(err.orig): print("Post Id sudah ada") else: print("Integritas data bermasalah", str(err)) finally: session.close() async def kirim_ke_tele(): cek_data = ( session.query(Notifikasi) .filter(Notifikasi.remark.like("%elu%")) .order_by(desc(Notifikasi.id)) .limit(10) .all() ) for notif in cek_data: def icon_(type_): if type_ == "mention": return "šŸ’¬ mention your note" elif type_ == "reblog": return "šŸš€ boost your note" elif type_ == "favourite": return "šŸ’– falling in love with your note" elif type_ == "follow": return "āœ‹ started to follow you" else: return None flag = icon_(notif.type) if str(notif.type) == "follow": try: await bot.send_message( parse_mode="html", chat_id=str(TELE_CHATID), text=f""" {notif.display_name}\n{notif.handler}\n{flag} """, ) except Exception as e: print("Follow: Ada masalah saat kirim ke Telebot", e) else: # html_strip = re.compile("<.*?>") # status = str(notif.status) # status_text = re.sub(html_strip, "", status) raw = str(notif.status) # raw = raw.strip() raw = raw.replace("

", "", 1) raw = raw.rsplit("

", 1)[0] rubah_p = re.sub(r"

", "\n\n", raw) rubah_br = re.sub(r"
", "\n", rubah_p) status_text = re.sub(r"<.*?>", "", rubah_br) try: await bot.send_message( parse_mode="html", chat_id=str(TELE_CHATID), text=f""" {notif.display_name}\n{notif.handler}\n{flag}\n\nā{status_text}āž\n\nsource """, ) except Exception as e: print("Mentions: Ada masalah saat kirim ke Telebot", e) session.close() async def tandai_notif(): try: session.execute( update(Notifikasi) .where(Notifikasi.remark == "Belum") .values(remark="Sudah") ) session.commit() except Exception as e: print("Ada masalah saat update remark", e) finally: session.close() async def jalan(): await asyncio.sleep(2) await getNotif() await asyncio.sleep(5) await kirim_ke_tele() await asyncio.sleep(5) await tandai_notif() # if __name__ == "__main__": asyncio.run(jalan())