8-simulated-login

8. 模拟登录

Cookies池的搭建

  1. 配置文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 数据库地址
REDIS_HOST = 'localhost'

#Redis端口
REDIS_PORT = 6379

#Redis密码,无则填None
REDIS_PASSWORD = None

#生成器使用的浏览器
BROWSER_TYPE = 'Chrome'

#生产器类,如扩展其他站点,请在此配置
GENERATOR_MAP = {
'weibo': 'WeiboCookiesGenerator'
}

# 测试类,若扩展其他站点,请在此配置
TESTER_MAP = {
'weibo': 'WeiboValidTester'
}

TEST_URL_MAP = {
'weibo': 'https://m.weibo.cn/'
}

# 生成器和验证器循环周期
CYCLE = 120

# API地址和端口
API_HOST = '0.0.0.0'
API_PORT = 5000

# 生成器开关,模拟登录添加Cookies
GENERATOR_PROCESS = False

# 验证器开关,循环检测数据库中Cookies是否可用,不可用则删除
VALID_PROCESS = False

# API接口服务
API_PROCESS = True

2.存储模块
存储账号信息和cookies信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import random
import redis
from cookiespool.config import *

class RedisClient(object):
def __init__(self, type, website, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
"""
初始化Redis连接
:param host: 地址
:param port: 端口
:param password: 密码
"""

self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)
self.type = type
self.website = website

def name(self):

"""
获取Hash的名称
:return: Hash的名称
"""
return "{type}:{website}".format(type=self.type, website=self.website)

def set(self, username, value):
"""
设置键值对
:param username: 用户名
:param value: 密码或Cookies
:return:
"""
return self.db.hset(self.name(), username, value)

def get(self, username):
"""
根据键名获取键值
:param username: 用户名
:return:
"""
return self.db.hget(self.name(), username)

def delete(self, username):
"""
根据键名删除键值对
:param username: 用户名
:return: 删除结果
"""
return self.db.hdel(self.name(), username)

def count(self):
"""
获取数目
return: 数目
"""
return self.db.hlen(self.name())

def random(self):
"""
随机得到键值,用户随机Cookies获取
:return: 随机Cookies
"""
return random.choice(self.db.hvals(self.name()))
def usernames(self):
"""
获取所有账户信息
:return: 所有用户名
"""
return self.db.hkeys(self.name())

def all(self):
""" 获取所有键值对
:return: 用户名和密码或Cookies的映射表
"""
return self.db.hgetall(self.name())

if __name__ == '__main__':
conn = RedisClient('accounts', 'weibo')
result = conn.set('hell2o', 'sss3s')
print(result)

3.生成模块
获取各个账号信息,并模拟登录,生成Cookies并保存

3.1 由于是以新浪微博为例,新浪微博四宫格验证码破解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import time
from io import BytesIO
from PIL import Image
from selenium.common.exceptions import TimeoutException
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from os import listdir
from os.path import abspath, dirname
from selenium import webdriver

TEMPLATES_FOULDER = dirname(abspath(__file__)) + '/templates/'
ABSPATH = abspath(r"C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe")

class WeiboCookies():
def __init__(self, username,password, browser):
self.url = 'https://passport.weibo.cn/signin/login?entry=mweibo&r=https://m.weibo.cn/'
self.browser = browser
self.wait = WebDriverWait(self.browser, 20)
self.username = username
self.password = password

def open(self):
"""
打开网页输入用户名和密码并点击
:return: None
"""

# ????
self.browser.delete_all_cookies()
self.browser.get(self.url)
username = self.wait.until(EC.presence_of_element_located(
(By.ID, 'loginName')))
password = self.wait.until(EC.presence_of_element_located(
(By.ID, 'loginPassword')))
submit = self.wait.until(EC.element_to_be_clickable(
(By.ID, 'loginAction')))
username.send_keys(self.username)
password.send_keys(self.password)
time.sleep(2)
submit.click()

def password_error(self):
"""
判断密码是否错误
:return:
"""

### 1. ????
try:
return WebDriverWait(self.browser, 5).until(
EC.text_to_be_present_in_element((By.ID, 'errorMsg'), '用户名或密码错误'))
except TimeoutException:
return False

def login_successfully(self):
"""
判断是否登录成功
:return:
"""

# 1-1 ???两者区别
try:
return bool(
WebDriverWait(self.browser, 5).until(EC.presence_of_element_located(
(By.CLASS_NAME, 'drop-title')))
)
except TimeoutException:
return False

def get_position(self):
"""
获取验证码位置
:return: 验证码位置元组
"""
try:
img = self.wait.until(EC.presence_of_element_located(
(By.CLASS_NAME, 'patt-shadow')))
except TimeoutException:
print('未出现验证码')
self.open()
time.sleep(2)
location = img.location
size = img.size
top, bottom, left, right = location['y'], location['y']+size['height'], location['x'], location['x']+size['width']
return (top, bottom, left, right)

def get_screenshot(self):
"""
获取网页截图
:return: 截图对象
"""
screenshot = self.browser.get_screenshot_as_png()
screenshot = Image.open(BytesIO(screenshot))
return screenshot

def get_image(self, name='captcha.png'):
""" 获取验证码图片
:return: 图片对象
"""
top, bottom, left, right = self.get_position()
print('验证码位置:', top, bottom, left, right)
screenshot = self.get_screenshot()

# ??? 为啥变顺序变了
captcha = screenshot.crop((left, top, right, bottom))
return captcha

def is_pixel_equal(self, image1, image2, x, y):
"""
判断两个像素是否相同
:param: image1: 图片1
:param: image2: 图片2
:param: x: 位置 x
:param: y: 位置 y
:return: 像素是否相同
"""
# 取两个图片的像素点
pixel1 = image1.load()[x,y]
pixel2 = image2.load()[x,y]
threshold = 20
if abs(pixel1[0] - pixel2[0]) < threshold and abs(pixel1[1] - pixel2[2]) < threshold and abs(pixel1[2] - pixel2[2]) < threshold:
return True
else:
return False

def same_image(self, image, template):

"""
识别相似验证码
:param image: 待识别验证码
:param template: 模板
:return:
"""
# 相似度阈值
threshold = 0.99
count = 0
for x in range(image.width):
for y in range(image.height):
#判断像素是否相同
if self.is_pixel_equal(image,template,x,y):
count += 1
result = float(count) / (image.height * image.width)
if result > threshold:
print('匹配成功')
return True
return False

def detect_image(self, image):
"""
匹配图片
:param image: 图片
:return: 拖动顺序
"""
for template_name in listdir(TEMPLATES_FOULDER):
print('正在匹配:', template_name)
template = Image.open(TEMPLATES_FOULDER + template_name)
if self.same_image(image,template):
#返回顺序
numbers = [int(number) for number in list(template_name.split('.')[0])]
print('拖动顺序,', numbers)
return numbers


def move(self, numbers):
"""
根据顺序拖动
:param numbers:
:return:
"""
# 获得四个按点
try:
circles = self.browser.find_elements_by_css_selector('.patt-wrap .patt-circ')
dx = dy = 0
for index in range(4):
circle = circles[numbers[index] - 1]
#如果是第一次循环
if index == 0:
# 点击第一个按点
ActionChains(self.browser) \
.move_to_element_with_offset(circle, circle.size['width'] / 2, circle.size['height'] / 2) \
.click_and_hold().perform()
else:
# 小幅移动次数
times = 30
# 拖动
for i in range(times):
ActionChains(self.browser).move_by_offset(dx / times, dy / times).perform()
time.sleep(1 / times)
if index == 3:
# 松开鼠标
ActionChains(self.browser).release().perform()
else:
# 计算下一次偏移
dx = circles[numbers[index + 1] - 1].location['x'] - circle.location['x']
dy = circles[numbers[index + 1] - 1].location['y'] - circle.location['y']
except:
return False

def get_cookies(self):
"""
获取Cookies
:return:
"""
return self.browser.get_cookies()

def main(self):
"""
破解入口
:return:
"""
self.open()
if self.password_error():
return{
'status': 2,
'content': '用户名或密码错误'
}
# 如果不需要验证码直接登录成功
if self.login_successfully():
cookies = self.get_cookies()
return{
'status': 1,
'content': cookies
}
# 获取验证码图片
image = self.get_image('captcha.png')
numbers = self.detect_image(image)
self.move(numbers)
if self.login_successfully():
cookies = self.get_cookies()
return{
'status': 1,
'content': cookies
}
else:
return{
'status': 3,
'content': '登录失败'
}

if __name__ == '__main__':
br = webdriver.Chrome(executable_path=ABSPATH)
result = WeiboCookies('13480729500', 'fegg', br).main()
print(result)

3.2 对接3.1后的生成模板代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json
from selenium import webdriver
from selenium.webdriver import DesiredCapabilities
from cookiespool.config import *
from cookiespool.db import RedisClient
from login.weibo.cookies import WeiboCookies
import os

ABSPATH = os.path.abspath(r"C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe")

class CookiesGenerator(object):
def __init__(self, website = 'default'):
"""
父类,初始化一些对象
:param website: 名称
:param browser: 浏览器,若不使用浏览器则可设置为None
"""
self.website = website
self.cookies_db = RedisClient('cookies', self.website)
self.accounts_db = RedisClient('accounts', self.website)
self.init_browser()

def __del__(self):
self.close()


def init_browser(self):
"""
通过browser参数 初始化全局浏览器 供模拟登录使用
:return:
"""
if BROWSER_TYPE == 'PhantomJS':
caps = DesiredCapabilities.PHANTOMJS
caps[
"phantomjs.page.settings.userAgent"] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36'
self.browser = webdriver.PhantomJS(desired_capabilities=caps)
self.browser.set_window_size(1400,500)
elif BROWSER_TYPE == 'Chrome':
self.browser = webdriver.Chrome(executable_path=ABSPATH)

def new_cookies(self, username, password):
"""
新生成Cookies, 子类需要填写
:param username: 用户名
:param password: 密
:return:
"""
raise NotImplementedError

def process_cookies(self, cookies):
"""
处理cookies
:param cookies:
:return:
"""
dict = {}
for cookie in cookies:
dict[cookie['name']] = cookie['value']
return dict

def run(self):
"""
运行,得到所有账户,然后顺次模拟登录
:return:
"""
accounts_usernames = self.accounts_db.usernames()
cookies_username = self.cookies_db.usernames()
for username in accounts_usernames:
if not username in cookies_username:
password = self.accounts_db.get(username)
print('正在生成Cookies', '账号', username, '密码', password)
result = self.new_cookies(username, password)
# 成功获取
if result.get('status') == 1:
cookies = self.process_cookies(result.get('content'))
print('成功获取Cookies', cookies)
if self.cookies_db.set(username, json.dumps(cookies)):
print('成功保存Cookies')
elif result.get('status') == 2:
print(result.get('content'))
if self.accounts_db.delete(username):
print('账户删除成功')
else:
print(result.get('content'))
else:
print('所有账户都已经成功获取')

def close(self):
"""
关闭
:return:
"""
try:
print('Closing Browser')
self.browser.close()
del self.browser
except TypeError:
print('Browser not opened')


class WeiboCookiesGenerator(CookiesGenerator):
def __init__(self, website='weibo'):
"""
初始化操作
:param website: 站点名称
:param browser: 使用的浏览器
"""
CookiesGenerator.__init__(self,website)
self.website = website

def new_cookies(self, username, password):
"""
生成Cookies
:param username: 用户名
:param password: 密码
:return: 用户名和密码
"""
return WeiboCookies(username, password, self.browser).main()

if __name__ == '__main__':
generator = WeiboCookiesGenerator()
generator.run()
  1. 检测模板
    定时检测cookies,删除过期失效的,生成新的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import json
import requests
from requests.exceptions import ConnectionError
from cookiespool.db import *

class ValidTester(object):
def __init__(self, website='default'):
self.website = website
self.cookies_db = RedisClient('cookies', self.website)
self.accounts_db = RedisClient('accounts', self.website)

def test(self, username, cookies):
raise NotImplementedError

def run(self):
cookies_groups = self.cookies_db.all()
for username, cookies in cookies_groups.items():
self.test(username, cookies)

class WeiboValidTester(ValidTester):
def __init__(self, website='website'):
ValidTester.__init__(self, website)

def test(self, username, cookies):
print('正在测试Cookies', '用户名', username)
try:
cookies = json.loads(cookies)
except TypeError:
print('Cookies不合法', username)
self.cookies_db.delete(username)
print('删除Cookies', username)
return
try:
test_url = TEST_URL_MAP[self.website]
response = requests.get(test_url, cookies=cookies, timeout=5, allow_redirects=False)
if response.status_code == 200:
print('Cookies有效', username)
else:
print(response.status_code, response.headers)
print('Cookies失效', username)
self.cookies_db.delete(username)
print('删除Cookies', username)
except ConnectionError as e:
print('异常', e.args)

if __name__ == '__main__':
WeiboValidTester().run()
  1. 接口模板
    为方便一个Cookies池同时供多个爬虫使用,定义一个Web接口,方便爬虫访问此接口获得随机的Cookies,利用Flask来实现搭建。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import json
from flask import Flask, g
from cookiespool.config import *
from cookiespool.db import *

__all__ = ['app']

app = Flask(__name__)

@app.route('/')
def index():
return '<h2>Hi this is Cookie Pool System</h2>'

def get_conn():
"""
获取
:return:
"""
for website in GENERATOR_MAP:
print(website)
if not hasattr(g, website):
setattr(g, website + '_cookies', eval('RedisClient' + '("cookies", "' + website +'")'))
setattr(g, website + '_accounts', eval('RedisClient' + '("accounts", "' + website + '")'))
return g

@app.route('/<website>/random')
def randoma(website):
"""
获取随机的Cookies, 访问地址如 /weibo/random
:return: 随机Cookie
"""
g = get_conn()
cookies = getattr(g, website + '_cookies').random()
return cookies

@app.route('/<website>/add/<username>/<password>')
def add(website, username, password):
"""
添加用户,访问地址如 /weibo/add/user/password
:param website: 站点
:param username: 用户名
:param password: 密码
:return:
"""
g = get_conn()
print(username, password)
getattr(g, website + '_accounts').set(username, password)
return json.dumps({'status': '1'})

@app.route('/<website>/count')
def count(website):
"""
获取Cookies总数
"""
g = get_conn()
count = getattr(g, website + '_cookies').count()
return json.dumps({'status': '1', 'count': count})

if __name__ == '__main__':
app.run(host='0.0.0.0')
  1. 调度模块

为驱动前5个模板定时运行,使各个模块在不同进程上互相配合运行起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import time
from multiprocessing import Process
from cookiespool.api import app
from cookiespool.config import *
from cookiespool.generator import *
from cookiespool.tester import *

class Scheduler(object):
@staticmethod
def valid_cookie(cycle=CYCLE):
while True:
print('Cookies检测进程开始运行')
try:
for website,cls in TESTER_MAP.items():
tester = eval(cls + '(website="' + website +'")')
tester.run()
print('Cookies检测完成')
del tester
time.sleep(cycle)
except Exception as e:
print(e.args)

@staticmethod
def generator_cookies(cycle=CYCLE):
while True:
print('Cookies生成进程开始进行')
try:
for website, cls in GENERATOR_MAP.items():
generator = eval(cls + '(website="' + website +'")')
generator.run()
print('Cookies生成完成')
generator.close()
time.sleep(cycle)
except Exception as e:
print(e.args)

@staticmethod
def api():
print('API 接口开始运行')
app.run(host=API_HOST, port=API_PORT)

def run(self):
if API_PROCESS:
api_process = Process(target=Scheduler.api)
api_process.start()

if GENERATOR_PROCESS:
generator_process = Process(target=Scheduler.generator_cookies)
generator_process.start()

if VALID_PROCESS:
valid_process = Process(target=Scheduler.valid_cookie)
valid_process.start
  1. 运行调试小模块

7.1 局部账号密码信息录入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import requests

from cookiespool.db import RedisClient

conn = RedisClient('accounts', 'weibo')

def set(account, sep='----'):
username, password = account.split(sep)
result = conn.set(username, password)
print('账号', username, '密码', password)
print('录入成功' if result else '录入失败')

def scan():
print('请输入账号密码组,输入exit退出读入')
while True:
account = input()
if account == 'exit':
break
set(account)
if __name__ == '__main':
scan()

7.2 在入口文件同级目录下,调用账号密码信息输入

1
2
3
4
from cookiespool.importer import scan

if __name__ == '__main__':
scan()

7.3 入口文件

1
2
3
4
5
6
7
8
rom cookiespool.scheduler import Scheduler

def main():
s = Scheduler()
s.run()

if __name__ == '__main__':
main()
分享到