import datetime import json import multiprocessing import random import re import sched import threading import time import traceback from urllib import parse import requests from tools import loggerKit, redis_client from tools.utils import get_random_browser, get_external_ip, get_random_proxy_at_redis, save_all_proxy_ip def poc_2(account, passwd, target_url, log_name, single_proxy, random_index): """ # 生成 5 到 10 秒之间的随机等待时间 wait_time = random.uniform(5, 10) # 打印等待时间并等待 loggerKit.info(f"等待 {wait_time:.2f} 秒...") time.sleep(wait_time) """ if single_proxy is not None: # 开始业务逻辑 lock = threading.Lock() with lock: # event_cname # url = 'https://t.livepocket.jp/e/lxyyc' sub_str = target_url.replace("https://t.livepocket.jp/", "") strs = sub_str.split('/') cname = strs[len(strs) - 1] # print(cname) loggerKit.info(f'account: {account}, passwd: {passwd}') # writer = FileWriter(log_name) # writer.write_to_file(f'------{account}, {passwd}------ \n') user_agent = get_random_browser() # single_proxy = 'rrehqcjf:33f90umk8x32@185.199.228.220:7300' proxies = { 'http': f'http://{single_proxy}', # 'https': f'https://{single_proxy}' } loggerKit.info(f'proxies: {proxies}') # writer.write_to_file(f'account: {account}, passwd: {passwd}, proxies:{proxies} \n') # session if redis_client.get(f'token_{account}') is None: """ 模拟登录 """ # 定义请求地址 stamp = str(int(time.time())) login_url = f"https://t.livepocket.jp/api/sessions/create?mytimestamp={stamp}" # 定义请求header headers = {'Content-Type': 'application/x-www-form-urlencoded;', 'Referer': 'https://t.livepocket.jp/login?acroot=header-new_p_u_nl', 'Sec-Ch-Ua': 'Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99', 'User-Agent': user_agent, 'path': f'/api/sessions/create?mytimestamp={stamp}', 'Origin': 'https://t.livepocket.jp', 'Sec-Ch-Ua-Mobile': '?0', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'Priority': 'u=1, i' } # 通过字典方式定义请求body form_data = {"login": account, "password": passwd, "auto_login": "on", "login_password": f"{account}&{passwd}" } data = parse.urlencode(form_data) session = requests.session() content = session.post(url=login_url, headers=headers, data=data, proxies=proxies).text # loggerKit.info(f'login info: {content}') if 'token' not in content: loggerKit.info( f'login account:{account}, 本机出口IP:{get_external_ip()}、代理IP:{single_proxy} 被封!!!') # 将被封的IP数据从redis中去掉 # key = f'proxy_{str(random_index)}' # loggerKit.info(f'key:{key}') # redis_client.delete(key=key) # 生成 2 到 6 分之间的随机等待时间 wait_time = random.uniform(120, 180) # 打印等待时间并等待 loggerKit.info(f"等待 {wait_time:.2f} 秒...") time.sleep(wait_time) return cookie = session.cookies response_cookie = cookie.get_dict() login_resp = json.loads(content) token = login_resp['result']['token'] login_session = response_cookie['ci_session'] php_session = response_cookie['PHPSESSID'] loggerKit.info(f'login account:{account}, 代理IP:{single_proxy}, token: {token} 登录成功!!!') redis_client.set(f'token_{account}', token) redis_client.set(f'php_session_{account}', php_session) redis_client.set(f'ci_session_{account}', login_session) else: token = redis_client.get(f'token_{account}') login_session = redis_client.get(f'ci_session_{account}') php_session = redis_client.get(f'php_session_{account}') # event_id # 从redis中查询,查询不到再去请求 event_id = redis_client.get(f'event_id_{cname}') if event_id == 0 or event_id is None: response = requests.get(target_url, proxies=proxies) if response.status_code == 200: html_string = response.content.decode('utf8') start_index = html_string.find('https://t.livepocket.jp/purchase/verify?event_id=') if start_index != -1: end_index = html_string.find("'", start_index) if end_index != -1: event_id = html_string[ start_index + len('https://t.livepocket.jp/purchase/verify?event_id='):end_index] redis_client.set(f'event_id_{cname}', event_id) loggerKit.info(f'event_id: {event_id}') else: loggerKit.warning('End index not found.') else: loggerKit.warning('Substring not found.') else: loggerKit.error('Failed to fetch the page. Status code: {0}', response.status_code) # ticket_id # 从redis中查询,不存在则请求 ticket_id = redis_client.get(f'ticket_id_{cname}') if ticket_id == 0 or ticket_id is None: response = requests.get(target_url, proxies=proxies) if response.status_code == 200: html_string = response.text match = re.search(r'id="js_order_limited_(\d+)"', html_string) if match: ticket_id = match.group(1) redis_client.set(f'ticket_id_{cname}', ticket_id) loggerKit.info(f'ticket_id: {ticket_id}') else: loggerKit.warning("未找到匹配的数值") # redirect_url1 net_url = 'https://t.livepocket.jp/purchase?type=new' result = requests.Session() header = { 'Host': 't.livepocket.jp', 'User-Agent': "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/69.0.3947.100 Safari/537.36", 'Content-Type': 'application/x-www-form-urlencoded', 'Referer': f'{target_url}', } # my_session = 'IN7u3uuP5WcizQkmRzF%2F3OsLuNNQePnxNBvnlPgqT6FPz2BgyKT2KCJaMvdj8ZSa6wNp2xZIL6VoA7Mj0R2zECMZkOKJOLbsNwUdXwvcsROxjfCzibxEaj4nG%2Bq29dCEUdiWI2TgUsKSFGRZaOP0p37ktl%2B1wSMqMAk918Nkt7APIxpQtZ%2BoLwId4PMOkN3oOIwT3CvsuDbgPQHIuokZXcnNe8uOAUuZBLW4nKOdYimLcSw6oTMt85UXkwm4OyTdxYR3%2B8crA0OfQCyfHXqY%2Bj0t9R0mmNSusRXRDuDBmOCFR58vHVuuwZ0AXNHYoYB0sdm28VL9xgHHrzkvm2M9cpLEIVWUWBWGaZgtO3xp2d8L70%2BfAKwyv6JPGcGmGf9wUQzBltssBpMmYqSfkws1%2Bp8BOhmXOqfljSYbWcxxVVSw%2B2dtoxBXsTf793mD9sVYRsrr8YPGz4JPVGWGU0outg%3D%3D' cookies = { 'ci_session': f'{login_session}', 'PHPSESSID': f'{php_session}', 'list_count': '{"success":true,"result":{"myticket_count":{"count":"0"},"today_event":{"total_count":0,"data_list":[]},"unread_count":5},"submit":true}', 'sns_status': '{"success":true,"result":{"facebook":0,"twitter":0,"mixi":0,"line":0,"yahoo":0,"plusid":1,"google":0},"submit":true}', 'display_init': '{"success":true,"result":{"purchased_tickets":{"total_count":0,"data_list":{"ticket_info":[]}},"lottery_tickets":{"total_count":0,"data_list":{"ticket_info":[]}},"order_limited_event_tickets":{"data_list":{"ticket_info":[]},"total_count":0},"event_order_limit":true},"submit":true}', } ticket_key = f'ticket_id_{ticket_id}' form_data = { 'redirect_url': 'https://t.livepocket.jp/purchase/', 'event_id': event_id, 'event_cname': f'{cname}', 'ticket_type': 'lottery', 'facebook_ticket_count': 0, 'twitter_ticket_count': 0, 'plusid_linkage_invalidation_flg': 0, ticket_key: 1 } loggerKit.info('form_data: {0}', form_data) response = result.post(net_url, headers=header, cookies=cookies, data=form_data, allow_redirects=False, proxies=proxies) loggerKit.info('response.headers: {0}', response.headers) if 'Location' in response.headers: redirect_url1 = response.headers['Location'] loggerKit.info('redirect_url1: {0}', redirect_url1) else: return # https://t.livepocket.jp/purchase/security?id=309887&type=new # redirect_url2 response2 = result.get(redirect_url1, headers=header, cookies=cookies, allow_redirects=False, proxies=proxies) loggerKit.info('response2.headers: {0}', response2.headers) if 'Location' in response2.headers: redirect_url2 = response2.headers['Location'] loggerKit.info('redirect_url2: {0}', redirect_url2) else: return # https://t.livepocket.jp/purchase/confirm?id=309887&reserved_session_id=423272 # redirect_url3 response3 = result.get(redirect_url2, headers=header, cookies=cookies, allow_redirects=False, proxies=proxies) print('response3.headers: ', response3.headers) if 'Location' in response3.headers: redirect_url3 = response3.headers['Location'] loggerKit.info('redirect_url3: {0}', redirect_url3) else: loggerKit.warning('redirect_url3 is None') return # https://t.livepocket.jp/purchase/confirm?id=309887&reserved_session_id=794986&otoken=mMyZTgzMmQ4OWQ5ZjY5NWFlNGE2ZmJlMWM5Mm # drawing_tickets # drawing_url = 'https://t.livepocket.jp/api/drawing_tickets/entry?mytimestamp=' + str(int(time.time())) region_string = redirect_url3.split('?')[1] params = region_string.split('&') reserve_session_id = params[1].split("=")[1] drawing_headers = { 'Host': 't.livepocket.jp', 'User-Agent': user_agent, 'Content-Type': 'application/x-www-form-urlencoded', 'Referer': f'{redirect_url3}', } drawing_cookies = { 'ci_session': f'{login_session}', 'PHPSESSID': f'{php_session}', } drawing_form_data = { 'utoken': f'{token}', 'onetime_token_name': 'buy_ticket', 'onetime_token_value': params[2].split('=')[1], 'url': f'https://t.livepocket.jp/purchase/confirm?id={event_id}&reserved_session_id={reserve_session_id}', 'payment_method': 0, 'reserve_session_id': reserve_session_id, 'event_id': event_id, 'reserve_ticket[0][ticket_id]': ticket_id, 'reserve_ticket[0][amount]': 1, 'payment_type': 'credit' } drawing_response = result.post( f'https://t.livepocket.jp/api/drawing_tickets/entry?mytimestamp={str(int(time.time()))}', headers=drawing_headers, cookies=drawing_cookies, data=drawing_form_data, allow_redirects=False, proxies=proxies) loggerKit.info('drawing_response.text: {0}', drawing_response.text) drawing_data = json.loads(drawing_response.text) if drawing_data['success']: order_id = drawing_data['result']['order_id'] onetime_token_name = drawing_data['result']['onetime_token_name'] onetime_token_value = drawing_data['result']['onetime_token_value'] loggerKit.info( 'account: {0}, order_id: {1}, onetime_token_name: {2}, onetime_token_value: {3}, 抢到!!!', account, order_id, onetime_token_name, onetime_token_value) # writer.write_to_file(f'account:{account}, order_id:{order_id}, onetime_token_name:{onetime_token_name}, onetime_token_value:{onetime_token_value} \n') def process_account(pocket_url, account, log_name, single_proxy, random_index): try: run_poc(account.strip(), 'panyue666', 'https://t.livepocket.jp/e/z230-', log_name, single_proxy, random_index) except Exception as ex: loggerKit.error(f'error: {traceback.format_exc()}, account:{account}') def process_running(max_threads, pocket_url): log_name = f"data_{datetime.datetime.now().strftime('%Y%m%d%H%M')}.txt" with open('livepocket/account.txt', 'r') as accounts: account_list = accounts.readlines() threads = [] for account in account_list: # 直接从redis中获取 single_proxy, random_index = get_random_proxy_at_redis(600) thread = threading.Thread(target=process_account, args=(pocket_url, account, log_name, single_proxy, random_index)) threads.append(thread) thread.start() # 如果当前线程数达到上限,等待有线程结束后再创建新线程 if len(threads) >= max_threads: for t in threads: t.join() threads.clear() # 等待所有线程执行完毕 for thread in threads: thread.join() def run_poc(account, password, url, log_name, single_proxy, random_index): try: poc_2(account, password, url, log_name, single_proxy, random_index) except Exception as ex: loggerKit.error(f'error: {traceback.format_exc()}, account:{account}') def run_daily_job(hour, minute): # 获取当前日期和时间 now = datetime.datetime.now() # 计算下次运行时间 run_time = datetime.datetime(now.year, now.month, now.day, hour, minute) if run_time < now: # 如果运行时间小于当前时间,则在明天的相应时间运行任务 run_time = run_time + datetime.timedelta(days=1) # 计算等待时间 wait_time = (run_time - now).total_seconds() # 创建定时器对象 scheduler = sched.scheduler(time.time, time.sleep) # 添加定时任务 scheduler.enter(wait_time, 1, looper_job, ()) # 启动定时器 scheduler.run() def looper_job(): while 1: process_running(max_threads=8, pocket_url='https://t.livepocket.jp/e/kpzzl') if __name__ == '__main__': # 新增下面一行代码即可打包多进程 multiprocessing.freeze_support() save_all_proxy_ip(100, 9) # run_daily_job(8, 52) looper_job()