从一个启动浏览器并打开百度网页的代码开始
from selenium import webdriver driver = webdriver.chrome() driver.get('https://www.baidu.com')
from selenium import webdriver
这代码表示从selenium导入webdriver。进入selenium, 发现webdriver是一个包,那么导入的其实是webdriver包下的`___init__.py`文件
from .firefox.webdriver import WebDriver as Firefox # noqafrom .firefox.firefox_profile import FirefoxProfile # noqafrom .firefox.options import Options as FirefoxOptions # noqa #实例化的是.chrome.webdriver里的webDriverfrom .chrome.webdriver import WebDriver as Chrome # noqafrom .chrome.options import Options as ChromeOptions # noqafrom .ie.webdriver import WebDriver as Ie # noqafrom .ie.options import Options as IeOptions # noqafrom .edge.webdriver import WebDriver as Edge # noqafrom .opera.webdriver import WebDriver as Opera # noqafrom .safari.webdriver import WebDriver as Safari # noqafrom .blackberry.webdriver import WebDriver as BlackBerry #noqafrom .phantomjs.webdriver import WebDriver as PhantomJS # noqafrom .android.webdriver import WebDriver as Android # noqafrom .webkitgtk.webdriver import WebDriver as WebKitGTK # noqafrom .webkitgtk.options import Options as WebKitGTKOptionsfrom .remote.webdriver import WebDriver as Remote # noqafrom .common.desired_capabilities import DesiredCapabilities from .common.action_chains import ActionChains # noqafrom .common.touch_actions import TouchActions # noqafrom .common.proxy import Proxy # noqa
打开chrome.webdriver文件,下面只展示出相关代码
#selenium/webdriver/chrome/webdriver.pyimport warningsfrom selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriverfrom .remote_connection import ChromeRemoteConnectionfrom .service import Servicefrom .options import Optionsclass WebDriver(RemoteWebDriver): def __init__(self, executable_path="chromedriver", port=0, options=None, service_args=None, desired_capabilities=None, service_log_path=None, chrome_options=None): """ 参数: - executable_path - chromedriver的执行路径 默认在环境变里中查找 - port -http连接的端口号 - desired_capabilities: 一般浏览器的字典对象 - options: ChromeOptions的实例 """ #………………………………省略………………………………………… #第1步 实例化一个Service对象 self.service = Service( executable_path, port=port, service_args=service_args, log_path=service_log_path) #第2步 调用了service的start方法 self.service.start() #………………………………省略…………………………………………
WebDriver构造方法中最先实例化Service类,我们实例化chrome() 并没有参数,所以Service 的参数 executable_path="chromedriver" port=0,其余都是None
打开Chrome目录Service文件, 只有以下代码
#selenium/webdriver/chrome/service.pyfrom selenium.webdriver.common import serviceclass Service(service.Service): """ 实例化Service对象 管理ChromeDriver的启动和停止 """ def __init__(self, executable_path, port=0, service_args=None, log_path=None, env=None): """ 参数: - service_args : chromedriver 的参数 列表形式 - log_path : chromedriver的日志路径 """ self.service_args = service_args or [] if log_path: self.service_args.append('--log-path=%s' % log_path) #第1步 调用复类的构造方法 service.Service.__init__(self, executable_path, port=port, env=env, start_error_message="Please see https://sites.google.com/a/chromium.org/chromedriver/home") #重写父类方法 获取命令行的参数 def command_line_args(self): return ["--port=%d" % self.port] + self.service_args
该类继承了selenium.webdriver.common目录下 service 类,并重写了父类的command_line_args方法。构造方法中调用了父类的构造方法。
#selenium/webdriver/common/service.pyimport errnoimport osimport platformimport subprocessfrom subprocess import PIPEimport timefrom selenium.common.exceptions import WebDriverExceptionfrom selenium.webdriver.common import utilstry: from subprocess import DEVNULL _HAS_NATIVE_DEVNULL = Trueexcept ImportError: DEVNULL = -3 _HAS_NATIVE_DEVNULL = Falseclass Service(object): def __init__(self, executable, port=0, log_file=DEVNULL, env=None, start_error_message=""): self.path = executable self.port = port #默认自动获取一个端口 if self.port == 0: self.port = utils.free_port() if not _HAS_NATIVE_DEVNULL and log_file == DEVNULL: log_file = open(os.devnull, 'wb') self.start_error_message = start_error_message self.log_file = log_file #默认获取系统的环境变量 self.env = env or os.environ @property def service_url(self): """ Gets the url of the Service """ return "http://%s" % utils.join_host_port('localhost', self.port) def command_line_args(self): raise NotImplemented("This method needs to be implemented in a sub class") def start(self): """ Starts the Service. :Exceptions: - WebDriverException : Raised either when it can't start the service or when it can't connect to the service """ try: #启动chromedriver程序 参数为 --port=端口号 输入输出到devnull空设备 cmd = [self.path] cmd.extend(self.command_line_args()) self.process = subprocess.Popen(cmd, env=self.env, close_fds=platform.system() != 'Windows', stdout=self.log_file, stderr=self.log_file, stdin=PIPE) except TypeError: raise except OSError as err: if err.errno == errno.ENOENT: raise WebDriverException( "'%s' executable needs to be in PATH. %s" % ( os.path.basename(self.path), self.start_error_message) ) elif err.errno == errno.EACCES: raise WebDriverException( "'%s' executable may have wrong permissions. %s" % ( os.path.basename(self.path), self.start_error_message) ) else: raise except Exception as e: raise WebDriverException( "The executable %s needs to be available in the path. %s\n%s" % (os.path.basename(self.path), self.start_error_message, str(e))) count = 0 #检测是否subprocess进程是否还在,不在则抛出异常 #检测是否http协议是否链接 若无链接等待30秒抛出异常 while True: self.assert_process_still_running() if self.is_connectable(): break count += 1 time.sleep(1) if count == 30: raise WebDriverException("Can not connect to the Service %s" % self.path) def assert_process_still_running(self): return_code = self.process.poll() if return_code is not None: raise WebDriverException( 'Service %s unexpectedly exited. Status code was: %s' % (self.path, return_code) ) #判断是否正在连接,等待30秒后抛出webdriver异常 def is_connectable(self): return utils.is_connectable(self.port)
由上代码可知Serivce的实例化 获取一个端口。
然后调用了service对象的start方法。该方法用subprocess启动chromedriver程序 并检测是否正在连接。
现在再来看最开始chrome 的webDriver类, 此类继承了selenium.webdriver.remote下的webdriver并调用了父类的构造方法。
#selenium/webdriver/remote/webdriver.pyimport warningsfrom selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriverfrom .remote_connection import ChromeRemoteConnectionfrom .service import Servicefrom .options import Optionsclass WebDriver(RemoteWebDriver): """ Controls the ChromeDriver and allows you to drive the browser. You will need to download the ChromeDriver executable from http://chromedriver.storage.googleapis.com/index.html """ def __init__(self, executable_path="chromedriver", port=0, options=None, service_args=None, desired_capabilities=None, service_log_path=None, chrome_options=None): #………………………………省略………………………………………… if options is None: # desired_capabilities stays as passed in if desired_capabilities is None: #第1步 创建一个浏览器的字典对象 desired_capabilities = self.create_options().to_capabilities() else: if desired_capabilities is None: desired_capabilities = options.to_capabilities() else: desired_capabilities.update(options.to_capabilities()) #………………………………省略………………………………………… #第二步调用 复类的构造方法 try: RemoteWebDriver.__init__( self, command_executor=ChromeRemoteConnection( remote_server_addr=self.service.service_url), desired_capabilities=desired_capabilities) except Exception: self.quit() raise self._is_remote = False def create_options(self): return Options()
首先创建一个浏览器的字典对象,然后调用了to_capabilities()方法。
Options的to_capabilities()方法是返回一个caps字典对象
chrome浏览器返回的caps字典对象为:
{
'browserName': 'chrome',
'version': '',
'platform': 'ANY',
'goog:chromeOptions': {'extensions': [], 'args': []}
}
接下来看看 RemoteWebDriver的构造方法
RemoteWebDriver.__init__( self, command_executor=ChromeRemoteConnection( remote_server_addr=self.service.service_url), desired_capabilities=desired_capabilities)
传入了2个参数 一个是 ChromeRemoteConnection类的实例对象, 一个是前面获取到的浏览器字典对象。
来看看ChromeRemoteConnection类。继承了RemoteConnection,调用了父类的构造方法并往self._commands添加里几个command键值对
#selenium/webdriver/chrome/remote_connection.pyfrom selenium.webdriver.remote.remote_connection import RemoteConnectionclass ChromeRemoteConnection(RemoteConnection): def __init__(self, remote_server_addr, keep_alive=True): RemoteConnection.__init__(self, remote_server_addr, keep_alive) self._commands["launchApp"] = ('POST', '/session/$sessionId/chromium/launch_app') self._commands["setNetworkConditions"] = ('POST', '/session/$sessionId/chromium/network_conditions') self._commands["getNetworkConditions"] = ('GET', '/session/$sessionId/chromium/network_conditions')
#selenium/webdriver/remote/remote_connection.pyclass RemoteConnection(object): """A connection with the Remote WebDriver server. Communicates with the server using the WebDriver wire protocol: https://github.com/SeleniumHQ/selenium/wiki/JsonWireProtocol""" def __init__(self, remote_server_addr, keep_alive=False, resolve_ip=True): # Attempt to resolve the hostname and get an IP address. self.keep_alive = keep_alive parsed_url = parse.urlparse(remote_server_addr) addr = parsed_url.hostname if parsed_url.hostname and resolve_ip: port = parsed_url.port or None if parsed_url.scheme == "https": ip = parsed_url.hostname elif port and not common_utils.is_connectable(port, parsed_url.hostname): ip = None LOGGER.info('Could not connect to port {} on host ' '{}'.format(port, parsed_url.hostname)) else: ip = common_utils.find_connectable_ip(parsed_url.hostname, port=port) if ip: netloc = ip addr = netloc if parsed_url.port: netloc = common_utils.join_host_port(netloc, parsed_url.port) if parsed_url.username: auth = parsed_url.username if parsed_url.password: auth += ':%s' % parsed_url.password netloc = '%s@%s' % (auth, netloc) remote_server_addr = parse.urlunparse( (parsed_url.scheme, netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment)) else: LOGGER.info('Could not get IP address for host: %s' % parsed_url.hostname) self._url = remote_server_addr if keep_alive: self._conn = httplib.HTTPConnection( str(addr), str(parsed_url.port), timeout=self._timeout) self._commands = { Command.STATUS: ('GET', '/status'), Command.NEW_SESSION: ('POST', '/session'), Command.GET_ALL_SESSIONS: ('GET', '/sessions'), Command.QUIT: ('DELETE', '/session/$sessionId'), Command.GET_CURRENT_WINDOW_HANDLE: ('GET', '/session/$sessionId/window_handle'), Command.W3C_GET_CURRENT_WINDOW_HANDLE: ('GET', '/session/$sessionId/window'), Command.GET_WINDOW_HANDLES: ('GET', '/session/$sessionId/window_handles'), #................省略................. } #最终发送命令到远程服务器的方法 def execute(self, command, params): command_info = self._commands[command] assert command_info is not None, 'Unrecognised command %s' % command path = string.Template(command_info[1]).substitute(params) if hasattr(self, 'w3c') and self.w3c and isinstance(params, dict) and 'sessionId' in params: del params['sessionId'] data = utils.dump_json(params) url = '%s%s' % (self._url, path) return self._request(command_info[0], url, body=data) #返回带有JSON解析的字典 def _request(self, method, url, body=None): """ Send an HTTP request to the remote server. :Args: - method - A string for the HTTP method to send the request with. - url - A string for the URL to send the request to. - body - A string for request body. Ignored unless method is POST or PUT. :Returns: A dictionary with the server's parsed JSON response. """ LOGGER.debug('%s %s %s' % (method, url, body)) parsed_url = parse.urlparse(url) headers = self.get_remote_connection_headers(parsed_url, self.keep_alive) resp = None if body and method != 'POST' and method != 'PUT': body = None if self.keep_alive: resp = self._conn.request(method, url, body=body, headers=headers) statuscode = resp.status else: http = urllib3.PoolManager(timeout=self._timeout) resp = http.request(method, url, body=body, headers=headers) statuscode = resp.status if not hasattr(resp, 'getheader'): if hasattr(resp.headers, 'getheader'): resp.getheader = lambda x: resp.headers.getheader(x) elif hasattr(resp.headers, 'get'): resp.getheader = lambda x: resp.headers.get(x) data = resp.data.decode('UTF-8') try: if 300 <= statuscode < 304: return self._request('GET', resp.getheader('location')) if 399 < statuscode <= 500: return {'status': statuscode, 'value': data} content_type = [] if resp.getheader('Content-Type') is not None: content_type = resp.getheader('Content-Type').split(';') if not any([x.startswith('image/png') for x in content_type]): try: data = utils.load_json(data.strip()) except ValueError: if 199 < statuscode < 300: status = ErrorCode.SUCCESS else: status = ErrorCode.UNKNOWN_ERROR return {'status': status, 'value': data.strip()} # Some of the drivers incorrectly return a response # with no 'value' field when they should return null. if 'value' not in data: data['value'] = None return data else: data = {'status': 0, 'value': data} return data finally: LOGGER.debug("Finished Request") resp.close()
构造方法中主要是把localhost域名换成127.0.0.1,通过urllib.parse.urlparse把要处理的url解析6大部分。
urlparse返回的是一个名字元组对象scheme, netloc, path, params, query, fragment。netloc包括hostname和port。
调用 common_utils.find_connectable_ip()方法获取hostname对应的ip地址,最后urllib.parse.urlunparse()重新组成url并赋值给self._url
初始化里self._commands 字典,value为具体执行的命令的字典。
RemoteConnection类的实例方法execute调用 _request方法最终实现发送命令到远程服务器。
他们是通过wire protocol有线协议 这种协议是点对点方式进行通信的。首先前端将这个点击转换成json格式的字符串,然后通过wire protocl协议传递给服务器
RemoteWebDriver类的构造方法 更新capabilities字典 主要调用start_session传入capabilities字典
start_session方法 根据capabilities字典创建一个新的会话并获取session_id。
另外还实例化了错误处理handle,文件查找file_detector(默认实例化是LocalFileDetector)。一个页面切换的SwitchTo对象。
#selenium/webdriver/remote/webdriver.pyclass WebDriver(object): _web_element_cls = WebElement def __init__(self, command_executor='http://127.0.0.1:4444/wd/hub', desired_capabilities=None, browser_profile=None, proxy=None, keep_alive=False, file_detector=None, options=None): """ 创建一个driver使用 wire协议发送命令 参数: - command_executor - 远程服务器的url 'http://127.0.0.1:端口号' - desired_capabilities - A dictionary of capabilities to request when starting the browser session. 必选参数 - proxy - 一个selenium.webdriver.common.proxy.Proxy 对象. 可选的 - file_detector - 自定义文件检测器对象. 默认使用LocalFileDetector() - options - options.Options类的实例 """ capabilities = {} if options is not None: capabilities = options.to_capabilities() if desired_capabilities is not None: if not isinstance(desired_capabilities, dict): raise WebDriverException("Desired Capabilities must be a dictionary") else: #更新capabilities字典 capabilities.update(desired_capabilities) if proxy is not None: warnings.warn("Please use FirefoxOptions to set proxy", DeprecationWarning) proxy.add_to_capabilities(capabilities) self.command_executor = command_executor if type(self.command_executor) is bytes or isinstance(self.command_executor, str): self.command_executor = RemoteConnection(command_executor, keep_alive=keep_alive) self._is_remote = True #控制浏览器会话的字符串id self.session_id = None self.capabilities = {} #errorhandler.ErrorHandler 处理错误的handler self.error_handler = ErrorHandler() self.start_client() if browser_profile is not None: warnings.warn("Please use FirefoxOptions to set browser profile", DeprecationWarning) #核心代码 开始一个会话 self.start_session(capabilities, browser_profile) #实例化页面切换对象 self._switch_to = SwitchTo(self) #app self._mobile = Mobile(self) #默认实例化LocalFileDetector对象 self.file_detector = file_detector or LocalFileDetector def start_session(self, capabilities, browser_profile=None): """ 根据capabilities字典创建一个新的会话 browser_profile FirefoxProfile的一个对象 只有火狐浏览器 """ if not isinstance(capabilities, dict): raise InvalidArgumentException("Capabilities must be a dictionary") if browser_profile: if "moz:firefoxOptions" in capabilities: capabilities["moz:firefoxOptions"]["profile"] = browser_profile.encoded else: capabilities.update({'firefox_profile': browser_profile.encoded}) """ _make_w3c_caps return dict { "firstMatch": [{}], "alwaysMatch": { 'browserName': 'chrome', 'version': '', 'platformName': 'any', 'goog:chromeOptions': {'extensions': [], 'args': []} } } """ w3c_caps = _make_w3c_caps(capabilities) parameters = {"capabilities": w3c_caps, "desiredCapabilities": capabilities} #Command.NEW_SESSION: ('POST', '/session'), response = self.execute(Command.NEW_SESSION, parameters) if 'sessionId' not in response: response = response['value'] #获取session_id self.session_id = response['sessionId'] self.capabilities = response.get('value') # if capabilities is none we are probably speaking to # a W3C endpoint if self.capabilities is None: self.capabilities = response.get('capabilities') # Double check to see if we have a W3C Compliant browser self.w3c = response.get('status') is None self.command_executor.w3c = self.w3c def _make_w3c_caps(caps): """Makes a W3C alwaysMatch capabilities object. Filters out capability names that are not in the W3C spec. Spec-compliant drivers will reject requests containing unknown capability names. Moves the Firefox profile, if present, from the old location to the new Firefox options object. :Args: - caps - A dictionary of capabilities requested by the caller. """ #深拷贝 caps = copy.deepcopy(caps) #因为浏览器chrome 所以profile为None profile = caps.get('firefox_profile') always_match = {} if caps.get('proxy') and caps['proxy'].get('proxyType'): caps['proxy']['proxyType'] = caps['proxy']['proxyType'].lower() for k, v in caps.items(): #如果caps的key 在_OSS_W3C_CONVERSION key中 而且caps的key对应的值不为空 if v and k in _OSS_W3C_CONVERSION: #always_match的key 为_OSS_W3C_CONVERSION字典的值 value是caps字典的值 always_match[_OSS_W3C_CONVERSION[k]] = v.lower() if k == 'platform' else v if k in _W3C_CAPABILITY_NAMES or ':' in k: always_match[k] = v if profile: moz_opts = always_match.get('moz:firefoxOptions', {}) # If it's already present, assume the caller did that intentionally. if 'profile' not in moz_opts: # Don't mutate the original capabilities. new_opts = copy.deepcopy(moz_opts) new_opts['profile'] = profile always_match['moz:firefoxOptions'] = new_opts return {"firstMatch": [{}], "alwaysMatch": always_match} _OSS_W3C_CONVERSION = { 'acceptSslCerts': 'acceptInsecureCerts', 'version': 'browserVersion', 'platform': 'platformName'} #通过self.command_executor.execute发送cmd命令到远程服务器达到控制浏览器的目标。 def execute(self, driver_command, params=None): """ 通过command.CommandExecutor执行driver_command命令 返回一个字典对象 里面装着JSON response """ if self.session_id is not None: if not params: params = {'sessionId': self.session_id} elif 'sessionId' not in params: params['sessionId'] = self.session_id #数据封包 params = self._wrap_value(params) #核心代码 执行cmmand_executor实例对象的execute方法 response = self.command_executor.execute(driver_command, params) if response: self.error_handler.check_response(response) #数据解包 response['value'] = self._unwrap_value( response.get('value', None)) return response # If the server doesn't send a response, assume the command was # a success return {'success': 0, 'value': None, 'sessionId': self.session_id}
driver.get('https://www.baidu.com')调用的是webdriver/remote/webdriver.py下的get方法
get方法调用了remote_connection.py中execute的方法,remote_connection.py中execute的方法中self.command_executor.execute实际调用的是RemoteConnection.py的execute方法。
实际上是一个HTTP request给监听端口上的Web Service, 在我们的HTTP request的body中,会以WebDriver Wire协议规定的JSON格式的字符串来告诉Selenium我们希望浏览器打开'https://www.baidu.com'页面
#selenium/webdriver/remote/webdriver.py def get(self, url): """ Loads a web page in the current browser session. """ #Command.GET: ('POST', '/session/$sessionId/url'), self.execute(Command.GET, {'url': url})
总结一下:
首先是webdriver实例化Service 类调用start()方法用subprocess启动chromedriver(带--port参数)驱动。chromedriver启动之后都会在绑定的端口启动Web Service。
接着实例化RemoteConnection获得 command_executor实例化对象 传入给RemoteWebDriver构造方法。
RemoteWebDriver构造方法 start_session()方法启动session并获得唯一的session_id,通过这个session_id来确定找到对方且在多线程并行的时候彼此之间不会有冲突和干扰)
接下来调用WebDriver的任何API,比如get() 都需要借助一个ComandExecutor(remote_connection类的实例对象)调用execute()发送一个命令(这个命令在ComandExecutor实例化时候生成的一个command字典)。
#部分self._commands = { Command.STATUS: ('GET', '/status'), Command.NEW_SESSION: ('POST', '/session'), Command.GET_ALL_SESSIONS: ('GET', '/sessions'), Command.QUIT: ('DELETE', '/session/$sessionId'), Command.GET_CURRENT_WINDOW_HANDLE: ('GET', '/session/$sessionId/window_handle'), Command.W3C_GET_CURRENT_WINDOW_HANDLE: ('GET', '/session/$sessionId/window'), Command.GET_WINDOW_HANDLES: ('GET', '/session/$sessionId/window_handles'), #.................省略.....................}
ComandExecutor中的execute()方法最后返回一个_request()方法,实际上是一个HTTP request给监听端口上的Web Service。
在HTTP request的body中,Wire JSON格式字典来告诉chromedriver接下来做什么事。(通过之前绑定的端口)
实际的执行者是chromedriver驱动,而selenium就相当于一个代理。所以selenium并不是直接操控浏览器而是运行webdriver, 通过webdriver间接操控浏览器。
在现实生活中这类似打出租车,我们告诉司机目的地是哪?走哪条路到达?webdriver就相当于出租车司机。