https://gist.github.com/mkmark/91e2a8a3b848a064773d75df597e7071
# %% import
import json
import datetime
import dateutil.tz
import logging
import io
import requests
import urllib.parse
import pickle
import os
import atexit
# %% logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler_console = logging.StreamHandler()
handler_console.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler_console.setFormatter(formatter)
log_stringIO = io.StringIO()
handler_string = logging.StreamHandler(log_stringIO)
handler_string.setLevel(logging.INFO)
formatter = logging.Formatter('%(message)s')
handler_string.setFormatter(formatter)
logger.addHandler(handler_console)
logger.addHandler(handler_string)
# %% jsstm
class Jsstm():
def __init__(self):
self.session = requests.session()
if os.path.exists('jsstm_cookie'):
with open('jsstm_cookie', 'rb') as f:
self.session.cookies.update(pickle.load(f))
def get_headers(self):
headers = {
'Origin': 'https://jsstm.jszwfw.gov.cn',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'X-Requested-With': 'XMLHttpRequest',
'User-Agent': {{User-Agent}},
'Referer': 'https://jsstm.jszwfw.gov.cn/jkmIndex.html?token={{token}}&jszwuserid={{token2}}&uuid={{token3}}',
'x-mass-tappid': '2018062060350751',
'Accept-Language': 'en-US',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Accept-Encoding': 'gzip',
'Host': 'jsstm.jszwfw.gov.cn',
'Connection': 'Keep-Alive'
}
return(headers)
def http_post(self, job, url, data):
logger.debug('http_post start: %s', job)
response = self.session.post(url, data, headers=self.get_headers())
logger.debug('http_post end: %s', job)
if (response.status_code == 200):
logger.info('http_post success: %s', job)
else:
logger.info('http_post fail: %s', job)
exit()
return(response)
def userAuth(self, data):
job = 'userAuth'
url = 'https://jsstm.jszwfw.gov.cn/jkm/userAuth'
response = self.http_post(job, url, data)
return(response)
def get_urlabc(self, response):
response_dict = json.loads(response.text)
abc = response_dict['res']['userdetail']['abc']
urlabc = urllib.parse.quote(abc)
return(urlabc)
def queryAttendanceList(self, data):
job = 'queryAttendanceList'
url = 'https://jsstm.jszwfw.gov.cn/day/queryAttendanceList'
response = self.http_post(job, url, data)
return(response)
def get_last_attendance(self, response):
response_dict = json.loads(response.text)
timestamp = response_dict['res']['list'][0]['createtime']
utc = datetime.datetime.utcfromtimestamp(timestamp)
from_zone = dateutil.tz.gettz('UTC')
to_zone = dateutil.tz.gettz('China/Shang_Hai')
utc = utc.replace(tzinfo=from_zone)
utcp8 = utc.astimezone(to_zone)
utcp8_str = utcp8.strftime('%Y-%m-%d %H:%M:%S')
logger.info('last attendance: %s', utcp8_str)
return(utcp8_str)
def saveDailyAttendence(self, data):
job = 'saveDailyAttendence'
url = 'https://jsstm.jszwfw.gov.cn/day/saveDailyAttendance'
response = self.http_post(job, url, data)
# %% telegram_bot
class Telegram_bot():
proxies = {
'http': '{{proxy}}'
}
def ping(self, token, chat_id, text):
url = 'https://api.telegram.org/bot' + token + '/sendMessage?chat_id=' + chat_id + '&text=' + text
requests.get(url, proxies=self.proxies)
# %%
def exit_handler():
# telegram_bot
telegram_bot = Telegram_bot()
token = '{{telegram_token}}'
chat_id = '{{telegram_chat_id}}'
telegram_bot.ping(token, chat_id, log_stringIO.getvalue())
atexit.register(exit_handler)
if __name__ == '__main__':
jsstm = Jsstm()
# login
data_userAuth = 'token={{token}}&uuid={{token2}}&source=alipay'.encode('utf-8')
response = jsstm.userAuth(data_userAuth)
urlabc = jsstm.get_urlabc(response)
# saveDailyAttendence
data_saveDailyAttendence = ('member_id={{身份证}}&name={{url姓名}}&mobile={{手机号}}&idType=1°ree=37.3&realtimeLocation=&source=alipay°ree_flag=0&r1data=0&r2data=0&r3data=0&travel=0&r5data=&travel_destination=&travel_time=&travel_duration=&travel_destination_code=&other=&abc=' + urlabc).encode('utf-8')
jsstm.saveDailyAttendence(data_saveDailyAttendence)
# queryAttendanceList
data_queryAttendanceList = ('userId={{身份证}}&abc=' + urlabc).encode('utf-8')
response = jsstm.queryAttendanceList(data_queryAttendanceList)
utcp8_str = jsstm.get_last_attendance(response)
# save cookie across sessions
with open('jsstm_cookie', 'wb') as f:
pickle.dump(jsstm.session.cookies, f)