98 lines
3 KiB
Python
98 lines
3 KiB
Python
|
# %%
|
||
|
# ZED Python REPL
|
||
|
|
||
|
import requests
|
||
|
import os
|
||
|
import logging
|
||
|
import asyncio
|
||
|
import telegram
|
||
|
from json import dumps
|
||
|
from models import Notifikasi, Session
|
||
|
from sqlalchemy import and_, or_, func
|
||
|
from sqlalchemy.exc import IntegrityError
|
||
|
from dotenv import load_dotenv
|
||
|
from datetime import datetime
|
||
|
|
||
|
# Request ke kauaku
|
||
|
load_dotenv()
|
||
|
gts_api = os.getenv('GTS_API')
|
||
|
gts_token = os.getenv('GTS_TOKEN')
|
||
|
|
||
|
headers = {"Authorization": f"Bearer {gts_token}"}
|
||
|
|
||
|
session = Session()
|
||
|
|
||
|
TELE_BOT = os.getenv('TELE_BOT')
|
||
|
TELE_CHATID = os.getenv('TELE_CHATID')
|
||
|
bot = telegram.Bot(token=f'{TELE_BOT}')
|
||
|
|
||
|
def getNotif():
|
||
|
req = requests.get(f'{gts_api}?limit=5', headers=headers)
|
||
|
|
||
|
if req.status_code == 200:
|
||
|
data = req.json()
|
||
|
try:
|
||
|
for notif in data:
|
||
|
type = notif['type']
|
||
|
inreply = notif['status']['in_reply_to_id']
|
||
|
postid = notif['id']
|
||
|
created = datetime.strptime(notif['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ")
|
||
|
handler = notif['account']['acct']
|
||
|
username = notif['account']['display_name']
|
||
|
status = notif['status']['content']
|
||
|
url = notif['status']['url']
|
||
|
kirim = 'Belum'
|
||
|
|
||
|
if type == 'follow':
|
||
|
insert_follow = Notifikasi(post_id=postid, created_at=created, handler=handler, display_name=username, type=type, remark=kirim)
|
||
|
session.add(insert_follow)
|
||
|
else:
|
||
|
insert_mentions = Notifikasi(post_id=postid, created_at=created, handler=handler, display_name=username, type=type, status=status,url=url, remark=kirim)
|
||
|
|
||
|
session.add(insert_mentions)
|
||
|
session.commit()
|
||
|
|
||
|
except IntegrityError as err:
|
||
|
session.rollback()
|
||
|
if 'post_id' in str(err.orig):
|
||
|
print("Post Id sudah ada")
|
||
|
else:
|
||
|
print(f"Integritas data bermasalah", str(err))
|
||
|
finally:
|
||
|
session.close()
|
||
|
|
||
|
|
||
|
async def kirim_ke_tele():
|
||
|
cek_data = session.query(Notifikasi).filter(Notifikasi.remark.like('%elu%')).limit(10).all()
|
||
|
|
||
|
|
||
|
for notif in cek_data:
|
||
|
body = {
|
||
|
'text': 'text',
|
||
|
'chat_id': {str(TELE_CHATID)},
|
||
|
'parse_mode': 'markdown',
|
||
|
'text': f"{notif.post_id} : {notif.handler}"
|
||
|
}
|
||
|
|
||
|
if str(notif.type) == 'Follow':
|
||
|
try:
|
||
|
req = requests.post(f"https://api.telegram.org/bot{TELE_BOT}/sendMessage", headers=headers, data="fuck")
|
||
|
except:
|
||
|
print('Ada masalah saat kirim ke Telebot')
|
||
|
else:
|
||
|
try:
|
||
|
await bot.send_message(chat_id=str(TELE_CHATID), text=f"""
|
||
|
{notif.display_name}\n{notif.handler}
|
||
|
|
||
|
{notif.status}
|
||
|
"""
|
||
|
)
|
||
|
|
||
|
except Exception as e:
|
||
|
print(f'Ada masalah saat kirim ke Telebot1', e)
|
||
|
|
||
|
|
||
|
|
||
|
# getNotif()
|
||
|
asyncio.run(kirim_ke_tele())
|