ruff: fix something

This commit is contained in:
Kusaeni 2024-10-01 11:32:29 +07:00
parent 92623b7369
commit 2f29314faa
3 changed files with 94 additions and 60 deletions

View file

@ -16,7 +16,7 @@ App ini masih dalam pengembangan dan banyak hal yang belum bisa dilakukan, berik
- [x] Boost/Reblog - [x] Boost/Reblog
- [x] Follow - [x] Follow
- [x] Tandai data di database jika sudah terkirim - [x] Tandai data di database jika sudah terkirim
- [ ] Delay antar fungsi - [x] Delay antar fungsi
- [ ] Cron internal - [ ] Cron internal
- [ ] Build - [ ] Build

141
main.py
View file

@ -1,128 +1,160 @@
# %% # %%
# ZED Python REPL # ZED Python REPL
import requests
import os
import logging
import asyncio import asyncio
import telegram import os
import re import re
import json
from models import Notifikasi, Session
from sqlalchemy import and_, or_, func, update
from sqlalchemy.exc import IntegrityError
from dotenv import load_dotenv
from datetime import datetime from datetime import datetime
import requests
import telegram
from dotenv import load_dotenv
from sqlalchemy import update
from sqlalchemy.exc import IntegrityError
from models import Notifikasi, Session
# Request ke kauaku # Request ke kauaku
load_dotenv() load_dotenv()
gts_api = os.getenv('GTS_API') gts_api = os.getenv("GTS_API")
gts_token = os.getenv('GTS_TOKEN') gts_token = os.getenv("GTS_TOKEN")
headers = {"Authorization": f"Bearer {gts_token}"} headers = {"Authorization": f"Bearer {gts_token}"}
session = Session() session = Session()
TELE_BOT = os.getenv('TELE_BOT') TELE_BOT = os.getenv("TELE_BOT")
TELE_CHATID = os.getenv('TELE_CHATID') TELE_CHATID = os.getenv("TELE_CHATID")
bot = telegram.Bot(token=f'{TELE_BOT}') bot = telegram.Bot(token=f"{TELE_BOT}")
async def getNotif(): async def getNotif():
req = requests.get(f'{gts_api}?limit=5', headers=headers) req = requests.get(f"{gts_api}?limit=5", headers=headers)
if req.status_code == 200: if req.status_code == 200:
data = req.json() data = req.json()
print(data.__class__) print(data.__class__)
try: try:
for notif in data: for notif in data:
type = notif['type'] type = notif["type"]
inreply = None inreply = None
status_content = None status_content = None
url = None url = None
if 'status' in notif: if "status" in notif:
inreply = notif['status']['in_reply_to_id'] inreply = notif["status"]["in_reply_to_id"]
status_content = notif['status']['content'] status_content = notif["status"]["content"]
url = notif['status']['url'] url = notif["status"]["url"]
postid = notif['id'] postid = notif["id"]
created = datetime.strptime(notif['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ") created = datetime.strptime(
handler = notif['account']['acct'] notif["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ"
username = notif['account']['display_name'] )
kirim = 'Belum' handler = notif["account"]["acct"]
username = notif["account"]["display_name"]
kirim = "Belum"
if type == 'follow': if type == "follow":
insert_follow = Notifikasi(post_id=postid, created_at=created, handler=handler, display_name=username, type=type, remark=kirim) insert_follow = Notifikasi(
post_id=postid,
created_at=created,
handler=handler,
display_name=username,
type=type,
remark=kirim,
)
session.add(insert_follow) session.add(insert_follow)
else: else:
insert_mentions = Notifikasi(post_id=postid, created_at=created, handler=handler, display_name=username, type=type, status=status_content,url=url, remark=kirim) insert_mentions = Notifikasi(
inreplyto=inreply,
post_id=postid,
created_at=created,
handler=handler,
display_name=username,
type=type,
status=status_content,
url=url,
remark=kirim,
)
session.add(insert_mentions) session.add(insert_mentions)
session.commit() session.commit()
except IntegrityError as err: except IntegrityError as err:
session.rollback() session.rollback()
if 'post_id' in str(err.orig): if "post_id" in str(err.orig):
print("Post Id sudah ada") print("Post Id sudah ada")
else: else:
print(f"Integritas data bermasalah", str(err)) print("Integritas data bermasalah", str(err))
finally: finally:
session.close() session.close()
async def kirim_ke_tele(): async def kirim_ke_tele():
cek_data = session.query(Notifikasi).filter(Notifikasi.remark.like('%elu%')).limit(10).all() cek_data = (
session.query(Notifikasi)
.filter(Notifikasi.remark.like("%elu%"))
.limit(10)
.all()
)
for notif in cek_data: for notif in cek_data:
def icon_(type_): def icon_(type_):
if type_ == 'mention': if type_ == "mention":
return '💬 mention your note' return "💬 mention your note"
elif type_ == 'reblog': elif type_ == "reblog":
return '🚀 boost your note' return "🚀 boost your note"
elif type_ == 'favourite': elif type_ == "favourite":
return '💖 falling in love with your note' return "💖 falling in love with your note"
elif type_ == 'follow': elif type_ == "follow":
return '✋ started to follow you' return "✋ started to follow you"
else: else:
return None return None
flag = icon_(notif.type) flag = icon_(notif.type)
if str(notif.type) == 'follow': if str(notif.type) == "follow":
try: try:
await bot.send_message(parse_mode='html',chat_id=str(TELE_CHATID), text=f""" await bot.send_message(
parse_mode="html",
chat_id=str(TELE_CHATID),
text=f"""
<b>{notif.display_name}</b>\n<i>{notif.handler}</i>\n{flag}\n from python <b>{notif.display_name}</b>\n<i>{notif.handler}</i>\n{flag}\n from python
""" """,
) )
except Exception as e: except Exception as e:
print(f'Follow: Ada masalah saat kirim ke Telebot', e) print("Follow: Ada masalah saat kirim ke Telebot", e)
else: else:
html_strip = re.compile('<.*?>') html_strip = re.compile("<.*?>")
status = str(notif.status) status = str(notif.status)
status_text = re.sub(html_strip,'', status) status_text = re.sub(html_strip, "", status)
try: try:
await bot.send_message(parse_mode='html',chat_id=str(TELE_CHATID), text=f""" await bot.send_message(
parse_mode="html",
chat_id=str(TELE_CHATID),
text=f"""
<b>{notif.display_name}</b>\n<i>{notif.handler}</i>\n{flag}\n\n{status_text}\nfrom python\n{notif.url} <b>{notif.display_name}</b>\n<i>{notif.handler}</i>\n{flag}\n\n{status_text}\nfrom python\n{notif.url}
""" """,
) )
except Exception as e: except Exception as e:
print(f'Mentions: Ada masalah saat kirim ke Telebot', e) print("Mentions: Ada masalah saat kirim ke Telebot", e)
session.close() session.close()
async def tandai_notif(): async def tandai_notif():
try: try:
cek_data = session.query(Notifikasi).filter(Notifikasi.remark == 'Belum').all()
session.execute( session.execute(
update(Notifikasi). update(Notifikasi)
where(Notifikasi.remark == 'Belum'). .where(Notifikasi.remark == "Belum")
values(remark='Sudah') .values(remark="Sudah")
) )
session.commit() session.commit()
except Exception as e: except Exception as e:
print(f'Ada masalah saat update remark', e) print("Ada masalah saat update remark", e)
finally: finally:
session.close() session.close()
async def jalan(): async def jalan():
await asyncio.sleep(2) await asyncio.sleep(2)
await getNotif() await getNotif()
@ -131,6 +163,7 @@ async def jalan():
await asyncio.sleep(5) await asyncio.sleep(5)
await tandai_notif() await tandai_notif()
# #
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(jalan()) asyncio.run(jalan())

View file

@ -1,20 +1,20 @@
# %% # %%
# ZED Python REPL # ZED Python REPL
from enum import unique from sqlalchemy import Column, Date, Integer, String, create_engine
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Date
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Engine # Engine
engine = create_engine('sqlite:///gotodon.db') engine = create_engine("sqlite:///gotodon.db")
# Base # Base
Base = declarative_base() Base = declarative_base()
# Models # Models
class Notifikasi(Base): class Notifikasi(Base):
__tablename__ = 'notifikasi' __tablename__ = "notifikasi"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
inreplyto = Column(String) inreplyto = Column(String)
@ -30,6 +30,7 @@ class Notifikasi(Base):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<{self.id} : {self.handler} >" return f"<{self.id} : {self.handler} >"
# Buat tabel # Buat tabel
Base.metadata.create_all(engine) Base.metadata.create_all(engine)