From 552815c77b5dbcdb58a4e1213f1bb79aeb3f6ed5 Mon Sep 17 00:00:00 2001 From: Boris <564773807@qq.com> Date: Fri, 29 May 2026 20:22:43 +0800 Subject: [PATCH 1/8] Modernize dependency compatibility --- .gitignore | 3 +- README.md | 4 +- feapder/db/redisdb.py | 31 +- feapder/network/proxy_pool_old.py | 2 +- feapder/requirements.txt | 27 +- feapder/utils/webdriver/selenium_driver.py | 323 +++++++++------------ setup.py | 33 +-- tests/test_dependency_modernization.py | 115 ++++++++ 8 files changed, 295 insertions(+), 243 deletions(-) create mode 100644 tests/test_dependency_modernization.py diff --git a/.gitignore b/.gitignore index fedead23..721f067a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ files/* +docs/superpowers/ .DS_Store .idea/* */.idea/* @@ -15,4 +16,4 @@ dist/ media/ .MWebMetaData/ push.sh -assets/ \ No newline at end of file +assets/ diff --git a/README.md b/README.md index 7bde6250..f13682fc 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,7 @@ FirstSpider|2021-02-09 14:55:14,620|air_spider.py|run|line:80|INFO| 无任务, ### Rapidproxy代理 - + @@ -133,7 +133,7 @@ FirstSpider|2021-02-09 14:55:14,620|air_spider.py|run|line:80|INFO| 无任务, ### NovProxy - +g diff --git a/feapder/db/redisdb.py b/feapder/db/redisdb.py index d882e687..ee4e0cd9 100644 --- a/feapder/db/redisdb.py +++ b/feapder/db/redisdb.py @@ -6,11 +6,11 @@ --------- @author: Boris """ -import os import time from typing import Union, List import redis +from redis.cluster import ClusterNode, RedisCluster from redis.connection import Encoder as _Encoder from redis.exceptions import ConnectionError, TimeoutError from redis.exceptions import DataError @@ -140,29 +140,24 @@ def get_connect(self): startup_nodes = [] for ip_port in ip_ports: ip, port = ip_port.split(":") - startup_nodes.append({"host": ip, "port": port}) + startup_nodes.append(ClusterNode(ip, int(port))) if self._service_name: # log.debug("使用redis哨兵模式") - hosts = [(node["host"], node["port"]) for node in startup_nodes] + hosts = [(node.host, node.port) for node in startup_nodes] sentinel = Sentinel(hosts, socket_timeout=3, **self._kwargs) self._redis = sentinel.master_for( self._service_name, password=self._user_pass, db=self._db, - redis_class=redis.StrictRedis, + redis_class=redis.Redis, decode_responses=self._decode_responses, max_connections=self._max_connections, **self._kwargs, ) + self._is_redis_cluster = False else: - try: - from rediscluster import RedisCluster - except ModuleNotFoundError as e: - log.error('请安装 pip install "feapder[all]"') - os._exit(0) - # log.debug("使用redis集群模式") self._redis = RedisCluster( startup_nodes=startup_nodes, @@ -171,11 +166,10 @@ def get_connect(self): max_connections=self._max_connections, **self._kwargs, ) - - self._is_redis_cluster = True + self._is_redis_cluster = True else: ip, port = ip_ports[0].split(":") - self._redis = redis.StrictRedis( + self._redis = redis.Redis( host=ip, port=port, db=self._db, @@ -186,7 +180,7 @@ def get_connect(self): ) self._is_redis_cluster = False else: - self._redis = redis.StrictRedis.from_url( + self._redis = redis.Redis.from_url( self._url, decode_responses=self._decode_responses, **self._kwargs ) self._is_redis_cluster = False @@ -573,7 +567,8 @@ def zexists(self, table, values): if isinstance(values, list): pipe = self._redis.pipeline() - pipe.multi() + if not self._is_redis_cluster: + pipe.multi() for value in values: pipe.zscore(table, value) is_exists_temp = pipe.execute() @@ -773,7 +768,8 @@ def setbit( else: assert len(offsets) == len(values), "offsets值要与values值一一对应" pipe = self._redis.pipeline() - pipe.multi() + if not self._is_redis_cluster: + pipe.multi() for offset, value in zip(offsets, values): pipe.setbit(table, offset, value) @@ -792,7 +788,8 @@ def getbit(self, table, offsets): """ if isinstance(offsets, list): pipe = self._redis.pipeline() - pipe.multi() + if not self._is_redis_cluster: + pipe.multi() for offset in offsets: pipe.getbit(table, offset) diff --git a/feapder/network/proxy_pool_old.py b/feapder/network/proxy_pool_old.py index 2e3bb6c1..fddaabc1 100644 --- a/feapder/network/proxy_pool_old.py +++ b/feapder/network/proxy_pool_old.py @@ -147,7 +147,7 @@ def get_proxy_from_redis(proxy_source_url, **kwargs): @return: [{'http':'http://xxx.xxx.xxx:xxx', 'https':'http://xxx.xxx.xxx.xxx:xxx'}] """ - redis_conn = redis.StrictRedis.from_url(proxy_source_url) + redis_conn = redis.Redis.from_url(proxy_source_url) key = kwargs.get("redis_proxies_key") assert key, "从redis中获取代理 需要指定 redis_proxies_key" proxies = redis_conn.zrange(key, 0, -1) diff --git a/feapder/requirements.txt b/feapder/requirements.txt index 21717674..888d27f2 100644 --- a/feapder/requirements.txt +++ b/feapder/requirements.txt @@ -1,21 +1,20 @@ better-exceptions>=0.2.2 -DBUtils>=2.0 -parsel>=1.5.2 +DBUtils>=3.0 +parsel>=1.8.1 PyExecJS>=1.5.1 -pymongo>=3.10.1 -PyMySQL>=0.9.3 -redis>=2.10.6,<4.0.0 -requests>=2.22.0 -selenium>=3.141.0 -bs4>=0.0.1 -ipython>=7.14.0 -bitarray>=1.5.3 -redis-py-cluster>=2.1.0 -cryptography>=3.3.2 -urllib3>=1.25.8 +pymongo>=4.0.0 +PyMySQL>=1.1.0 +redis>=5.0.0,<9.0.0 +requests>=2.31.0 +selenium>=4.10.0 +beautifulsoup4>=4.12.0 +ipython>=8.0.0 +bitarray>=2.8.0 +cryptography>=41.0.0 +urllib3>=2.0.0,<3.0.0 loguru>=0.5.3 influxdb>=5.3.1 pyperclip>=1.8.2 webdriver-manager>=4.0.0 terminal-layout>=2.1.3 -playwright \ No newline at end of file +playwright>=1.40.0 diff --git a/feapder/utils/webdriver/selenium_driver.py b/feapder/utils/webdriver/selenium_driver.py index 9f46d54b..06eb5493 100644 --- a/feapder/utils/webdriver/selenium_driver.py +++ b/feapder/utils/webdriver/selenium_driver.py @@ -8,13 +8,13 @@ @email: boris_liu@foxmail.com """ +import inspect import json import logging import os from typing import Optional, Union, List from selenium import webdriver -from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.firefox import GeckoDriverManager @@ -33,41 +33,7 @@ class SeleniumDriver(WebDriver, RemoteWebDriver): PHANTOMJS = "PHANTOMJS" FIREFOX = "FIREFOX" - __CHROME_ATTRS__ = { - "executable_path", - "port", - "options", - "service_args", - "desired_capabilities", - "service_log_path", - "chrome_options", - "keep_alive", - } - - __EDGE_ATTRS__ = __CHROME_ATTRS__ - - __FIREFOX_ATTRS__ = { - "firefox_profile", - "firefox_binary", - "timeout", - "capabilities", - "proxy", - "executable_path", - "options", - "service_log_path", - "firefox_options", - "service_args", - "desired_capabilities", - "log_path", - "keep_alive", - } - __PHANTOMJS_ATTRS__ = { - "executable_path", - "port", - "desired_capabilities", - "service_args", - "service_log_path", - } + __DRIVER_ATTRS__ = {"keep_alive"} def __init__(self, xhr_url_regexes: list = None, **kwargs): """ @@ -131,41 +97,117 @@ def filter_kwargs(self, kwargs: dict, driver_attrs: set): return data + def get_options(self, default_options, *option_keys): + for option_key in option_keys: + options = self._kwargs.get(option_key) + if options is not None: + return options + + return default_options + + def get_driver_kwargs(self): + return self.filter_kwargs(self._kwargs, self.__DRIVER_ATTRS__) + + def apply_capabilities(self, options, *capability_keys): + for capability_key in capability_keys: + capabilities = self._kwargs.get(capability_key) + if not capabilities: + continue + + for key, value in capabilities.items(): + options.set_capability(key, value) + + return options + + def build_service(self, service_cls, driver_manager_cls=None): + service = self._kwargs.get("service") + if service is not None: + return service + + service_kwargs = {} + service_args = self._kwargs.get("service_args") + if service_args is not None: + service_kwargs["service_args"] = service_args + + port = self._kwargs.get("port") + if port is not None: + service_kwargs["port"] = port + + log_path = self._kwargs.get("service_log_path") + if log_path is None: + log_path = self._kwargs.get("log_path") + if log_path is not None: + log_param = ( + "log_output" + if "log_output" in inspect.signature(service_cls).parameters + else "log_path" + ) + service_kwargs[log_param] = log_path + + if self._executable_path: + return service_cls(self._executable_path, **service_kwargs) + + if self._auto_install_driver and driver_manager_cls is not None: + return service_cls(driver_manager_cls().install(), **service_kwargs) + + if service_kwargs: + return service_cls(**service_kwargs) + + return None + + def create_driver(self, driver_cls, options, service): + kwargs = self.get_driver_kwargs() + if service is not None: + kwargs["service"] = service + + return driver_cls(options=options, **kwargs) + + def get_proxy(self): + return self._proxy() if callable(self._proxy) else self._proxy + + def get_user_agent(self): + return self._user_agent() if callable(self._user_agent) else self._user_agent + def get_driver(self): return self.driver def firefox_driver(self): - if webdriver.__version__ >= "4.0.0": - raise Exception( - f"暂未适配selenium=={webdriver.__version__}版本的firefox API,建议安装selenium==3.141.0版本或使用CHROME浏览器" - ) + from selenium.webdriver.firefox.service import Service - firefox_profile = webdriver.FirefoxProfile() - firefox_options = webdriver.FirefoxOptions() - firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX - try: - from selenium.webdriver.firefox.service import Service - except (ImportError, ModuleNotFoundError): - Service = None + firefox_options = self.get_options( + webdriver.FirefoxOptions(), "options", "firefox_options" + ) + firefox_profile = self._kwargs.get("firefox_profile") + if firefox_profile is not None: + firefox_options.profile = firefox_profile + firefox_binary = self._kwargs.get("firefox_binary") + if firefox_binary is not None: + firefox_options.binary_location = ( + getattr(firefox_binary, "path", None) + or getattr(firefox_binary, "_start_cmd", None) + or firefox_binary + ) + self.apply_capabilities(firefox_options, "desired_capabilities", "capabilities") if self._proxy: - proxy = self._proxy() if callable(self._proxy) else self._proxy - firefox_capabilities["marionette"] = True - firefox_capabilities["proxy"] = { - "proxyType": "MANUAL", - "httpProxy": proxy, - "ftpProxy": proxy, - "sslProxy": proxy, - } + proxy = self.get_proxy() + firefox_options.set_capability( + "proxy", + { + "proxyType": "MANUAL", + "httpProxy": proxy, + "ftpProxy": proxy, + "sslProxy": proxy, + }, + ) if self._user_agent: - firefox_profile.set_preference( - "general.useragent.override", - self._user_agent() if callable(self._user_agent) else self._user_agent, + firefox_options.set_preference( + "general.useragent.override", self.get_user_agent() ) if not self._load_images: - firefox_profile.set_preference("permissions.default.image", 2) + firefox_options.set_preference("permissions.default.image", 2) if self._headless: firefox_options.add_argument("--headless") @@ -176,25 +218,8 @@ def firefox_driver(self): for arg in self._custom_argument: firefox_options.add_argument(arg) - kwargs = self.filter_kwargs(self._kwargs, self.__FIREFOX_ATTRS__) - - if Service is None: - if self._executable_path: - kwargs.update(executable_path=self._executable_path) - elif self._auto_install_driver: - kwargs.update(executable_path=GeckoDriverManager().install()) - else: - if self._executable_path: - kwargs.update(service=Service(self._executable_path)) - elif self._auto_install_driver: - kwargs.update(service=Service(GeckoDriverManager().install())) - - driver = webdriver.Firefox( - capabilities=firefox_capabilities, - options=firefox_options, - firefox_profile=firefox_profile, - **kwargs, - ) + service = self.build_service(Service, GeckoDriverManager) + driver = self.create_driver(webdriver.Firefox, firefox_options, service) if self._window_size: driver.set_window_size(*self._window_size) @@ -202,31 +227,22 @@ def firefox_driver(self): return driver def chrome_driver(self): - chrome_options = webdriver.ChromeOptions() + chrome_options = self.get_options( + webdriver.ChromeOptions(), "options", "chrome_options" + ) # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option("useAutomationExtension", False) # docker 里运行需要 chrome_options.add_argument("--no-sandbox") - try: - from selenium.webdriver.chrome.service import Service - except (ImportError, ModuleNotFoundError): - Service = None + from selenium.webdriver.chrome.service import Service + + self.apply_capabilities(chrome_options, "desired_capabilities") if self._proxy: - chrome_options.add_argument( - "--proxy-server={}".format( - self._proxy() if callable(self._proxy) else self._proxy - ) - ) + chrome_options.add_argument("--proxy-server={}".format(self.get_proxy())) if self._user_agent: - chrome_options.add_argument( - "user-agent={}".format( - self._user_agent() - if callable(self._user_agent) - else self._user_agent - ) - ) + chrome_options.add_argument("user-agent={}".format(self.get_user_agent())) if not self._load_images: chrome_options.add_experimental_option( "prefs", {"profile.managed_default_content_settings.images": 2} @@ -254,19 +270,8 @@ def chrome_driver(self): for arg in self._custom_argument: chrome_options.add_argument(arg) - kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__) - if Service is None: - if self._executable_path: - kwargs.update(executable_path=self._executable_path) - elif self._auto_install_driver: - kwargs.update(executable_path=ChromeDriverManager().install()) - else: - if self._executable_path: - kwargs.update(service=Service(self._executable_path)) - elif self._auto_install_driver: - kwargs.update(service=Service(ChromeDriverManager().install())) - - driver = webdriver.Chrome(options=chrome_options, **kwargs) + service = self.build_service(Service, ChromeDriverManager) + driver = self.create_driver(webdriver.Chrome, chrome_options, service) # 隐藏浏览器特征 if self._use_stealth_js: @@ -293,44 +298,30 @@ def chrome_driver(self): ) if self._download_path: - driver.command_executor._commands["send_command"] = ( - "POST", - "/session/$sessionId/chromium/send_command", + driver.execute_cdp_cmd( + "Page.setDownloadBehavior", + {"behavior": "allow", "downloadPath": self._download_path}, ) - params = { - "cmd": "Page.setDownloadBehavior", - "params": {"behavior": "allow", "downloadPath": self._download_path}, - } - driver.execute("send_command", params) return driver def edge_driver(self): - edge_options = webdriver.EdgeOptions() + edge_options = self.get_options( + webdriver.EdgeOptions(), "options", "edge_options" + ) # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium edge_options.add_experimental_option("excludeSwitches", ["enable-automation"]) edge_options.add_experimental_option("useAutomationExtension", False) # docker 里运行需要 edge_options.add_argument("--no-sandbox") - try: - from selenium.webdriver.edge.service import Service - except (ImportError, ModuleNotFoundError): - Service = None + from selenium.webdriver.edge.service import Service + + self.apply_capabilities(edge_options, "desired_capabilities") if self._proxy: - edge_options.add_argument( - "--proxy-server={}".format( - self._proxy() if callable(self._proxy) else self._proxy - ) - ) + edge_options.add_argument("--proxy-server={}".format(self.get_proxy())) if self._user_agent: - edge_options.add_argument( - "user-agent={}".format( - self._user_agent() - if callable(self._user_agent) - else self._user_agent - ) - ) + edge_options.add_argument("user-agent={}".format(self.get_user_agent())) if not self._load_images: edge_options.add_experimental_option( "prefs", {"profile.managed_default_content_settings.images": 2} @@ -358,19 +349,8 @@ def edge_driver(self): for arg in self._custom_argument: edge_options.add_argument(arg) - kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__) - if Service is None: - if self._executable_path: - kwargs.update(executable_path=self._executable_path) - elif self._auto_install_driver: - raise NotImplementedError("edge not support auto install driver") - else: - if self._executable_path: - kwargs.update(service=Service(self._executable_path)) - elif self._auto_install_driver: - raise NotImplementedError("edge not support auto install driver") - - driver = webdriver.Edge(options=edge_options, **kwargs) + service = self.build_service(Service) + driver = self.create_driver(webdriver.Edge, edge_options, service) # 隐藏浏览器特征 if self._use_stealth_js: @@ -397,58 +377,19 @@ def edge_driver(self): ) if self._download_path: - driver.command_executor._commands["send_command"] = ( - "POST", - "/session/$sessionId/chromium/send_command", + driver.execute_cdp_cmd( + "Page.setDownloadBehavior", + {"behavior": "allow", "downloadPath": self._download_path}, ) - params = { - "cmd": "Page.setDownloadBehavior", - "params": {"behavior": "allow", "downloadPath": self._download_path}, - } - driver.execute("send_command", params) return driver def phantomjs_driver(self): - import warnings - - warnings.filterwarnings("ignore") - - service_args = [] - dcap = DesiredCapabilities.PHANTOMJS - - if self._proxy: - service_args.append( - "--proxy=%s" % self._proxy() if callable(self._proxy) else self._proxy - ) - if self._user_agent: - dcap["phantomjs.page.settings.userAgent"] = ( - self._user_agent() if callable(self._user_agent) else self._user_agent - ) - if not self._load_images: - service_args.append("--load-images=no") - - # 添加自定义的配置参数 - if self._custom_argument: - for arg in self._custom_argument: - service_args.append(arg) - - kwargs = self.filter_kwargs(self._kwargs, self.__PHANTOMJS_ATTRS__) - - if self._executable_path: - kwargs.update(executable_path=self._executable_path) - - driver = webdriver.PhantomJS( - service_args=service_args, desired_capabilities=dcap, **kwargs + raise NotImplementedError( + "PhantomJS is not supported by Selenium 4. " + "Please use CHROME, EDGE, or FIREFOX." ) - if self._window_size: - driver.set_window_size(self._window_size[0], self._window_size[1]) - - del warnings - - return driver - @property def domain(self): return tools.get_domain(self.url or self.driver.current_url) diff --git a/setup.py b/setup.py index cf4fe542..14542b97 100644 --- a/setup.py +++ b/setup.py @@ -13,8 +13,8 @@ import setuptools -if version_info < (3, 6, 0): - raise SystemExit("Sorry! feapder requires python 3.6.0 or later.") +if version_info < (3, 9, 0): + raise SystemExit("Sorry! feapder requires python 3.9.0 or later.") with open(join(dirname(__file__), "feapder/VERSION"), "rb") as fh: version = fh.read().decode("ascii").strip() @@ -35,15 +35,15 @@ requires = [ "better-exceptions>=0.2.2", - "DBUtils>=2.0", - "parsel>=1.5.2", - "PyMySQL>=0.9.3", - "redis>=2.10.6,<4.0.0", - "requests>=2.22.0", - "bs4>=0.0.1", - "ipython>=7.14.0", - "cryptography>=3.3.2", - "urllib3>=1.25.8", + "DBUtils>=3.0", + "parsel>=1.8.1", + "PyMySQL>=1.1.0", + "redis>=5.0.0,<9.0.0", + "requests>=2.31.0", + "beautifulsoup4>=4.12.0", + "ipython>=8.0.0", + "cryptography>=41.0.0", + "urllib3>=2.0.0,<3.0.0", "loguru>=0.5.3", "influxdb>=5.3.1", "pyperclip>=1.8.2", @@ -52,15 +52,14 @@ render_requires = [ "webdriver-manager>=4.0.0", - "playwright", - "selenium>=3.141.0", + "playwright>=1.40.0", + "selenium>=4.10.0", ] all_requires = [ - "bitarray>=1.5.3", + "bitarray>=2.8.0", "PyExecJS>=1.5.1", - "pymongo>=3.10.1", - "redis-py-cluster>=2.1.0", + "pymongo>=4.0.0", ] + render_requires setuptools.setup( @@ -69,7 +68,7 @@ author="Boris", license="MIT", author_email="feapder@qq.com", - python_requires=">=3.6", + python_requires=">=3.9", description="feapder是一款支持分布式、批次采集、数据防丢、报警丰富的python爬虫框架", long_description=long_description, long_description_content_type="text/markdown", diff --git a/tests/test_dependency_modernization.py b/tests/test_dependency_modernization.py new file mode 100644 index 00000000..e9a73b2a --- /dev/null +++ b/tests/test_dependency_modernization.py @@ -0,0 +1,115 @@ +import inspect + +from feapder.db.redisdb import RedisDB +from feapder.utils.webdriver.selenium_driver import SeleniumDriver + + +class OldStyleService: + def __init__( + self, executable_path=None, port=0, service_args=None, log_path=None, **kwargs + ): + self.executable_path = executable_path + self.port = port + self.service_args = service_args + self.log_path = log_path + + +class NewStyleService: + def __init__( + self, executable_path=None, port=0, service_args=None, log_output=None, **kwargs + ): + self.executable_path = executable_path + self.port = port + self.service_args = service_args + self.log_output = log_output + + +class FakeBrowser: + def set_window_size(self, *args): + self.window_size = args + + +def make_selenium_driver(**kwargs): + driver = object.__new__(SeleniumDriver) + driver._kwargs = kwargs + driver._executable_path = kwargs.pop("executable_path", "/tmp/driver") + driver._auto_install_driver = False + driver._proxy = None + driver._user_agent = None + driver._load_images = True + driver._headless = False + driver._custom_argument = None + driver._window_size = None + return driver + + +def test_selenium_service_log_path_supports_old_and_new_service_api(): + driver = make_selenium_driver( + service_log_path="/tmp/webdriver.log", service_args=["--verbose"], port=1234 + ) + + old_service = driver.build_service(OldStyleService) + new_service = driver.build_service(NewStyleService) + + assert old_service.executable_path == "/tmp/driver" + assert old_service.service_args == ["--verbose"] + assert old_service.port == 1234 + assert old_service.log_path == "/tmp/webdriver.log" + assert new_service.executable_path == "/tmp/driver" + assert new_service.service_args == ["--verbose"] + assert new_service.port == 1234 + assert new_service.log_output == "/tmp/webdriver.log" + + +def test_selenium_firefox_binary_maps_to_options_binary_location(): + driver = make_selenium_driver(firefox_binary="/tmp/firefox") + captured = {} + + def create_driver(driver_cls, options, service): + captured["options"] = options + captured["service"] = service + return FakeBrowser() + + driver.create_driver = create_driver + driver.build_service = lambda *args, **kwargs: None + + assert driver.firefox_driver() is not None + assert captured["options"].binary_location == "/tmp/firefox" + assert captured["service"] is None + + +def test_selenium_driver_kwargs_keep_public_constructor_args_internal(): + driver = make_selenium_driver( + keep_alive=False, + executable_path="/tmp/driver", + desired_capabilities={"acceptInsecureCerts": True}, + service_args=["--verbose"], + ) + + assert driver.get_driver_kwargs() == {"keep_alive": False} + + +def test_redisdb_public_api_signatures_are_preserved(): + expected = { + "__init__": [ + "self", + "ip_ports", + "db", + "user_pass", + "url", + "decode_responses", + "service_name", + "max_connections", + "kwargs", + ], + "from_url": ["url"], + "zadd": ["self", "table", "values", "prioritys"], + "zget": ["self", "table", "count", "is_pop"], + "zexists": ["self", "table", "values"], + "setbit": ["self", "table", "offsets", "values"], + "getbit": ["self", "table", "offsets"], + } + + for method_name, parameters in expected.items(): + signature = inspect.signature(getattr(RedisDB, method_name)) + assert list(signature.parameters) == parameters From 6094ec4a4d94e67f8502d630947ea0f1248c05ac Mon Sep 17 00:00:00 2001 From: Boris <564773807@qq.com> Date: Sun, 31 May 2026 18:04:22 +0800 Subject: [PATCH 2/8] docs: add runtime core refactor design --- ...2026-05-31-runtime-core-refactor-design.md | 214 ++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md diff --git a/docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md b/docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md new file mode 100644 index 00000000..08b34b6f --- /dev/null +++ b/docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md @@ -0,0 +1,214 @@ +# Core Runtime Refactor Design + +Date: 2026-05-31 + +## Scope + +This refactor focuses on the runtime path that moves work through feapder: + +- `Scheduler` +- `Collector` +- `ParserControl` +- `RequestBuffer` +- `ItemBuffer` + +The first implementation phase keeps the public spider APIs compatible. It does not change how users subclass `AirSpider`, `Spider`, `TaskSpider`, or `BatchSpider`, and it does not redesign their inheritance yet. The spider inheritance cleanup remains a follow-up phase after the runtime path is safer and better covered by tests. + +## Current Problems + +The runtime path works, but several responsibilities are tangled: + +- Thread lifecycle state is duplicated. Multiple classes maintain `_thread_stop`, `_is_adding_to_db`, `is_show_tip`, queue checks, and direct `_started.clear()` calls in slightly different ways. +- `ParserControl.deal_request` is too large. It handles parser lookup, download middleware, response fetching, validation, callback execution, result routing, retry, failed request persistence, metrics, browser release, and request deletion in one method. +- `ParserControl` and `AirSpiderParserControl` duplicate most request handling logic while differing mainly in queue backend and completion semantics. +- `Scheduler.all_thread_is_done()` depends on implementation details from collector and buffers. This makes shutdown sensitive to timing and temporary state. +- `RequestBuffer` and `ItemBuffer` both combine queueing, flushing, retry callbacks, persistence, and thread control. Their idle states are not explicit enough for a scheduler to reason about safely. +- Result handling rules are repeated in several places. A yielded value can be `Request`, `Item`, `UpdateItem`, callable, or invalid, but the classification logic is not centralized. +- Data safety concerns such as `eval`-based deserialization exist in adjacent runtime code. Replacing serialization is valuable, but it is not part of the first implementation slice unless touched code needs a small compatibility wrapper. + +## Goals + +1. Make runtime thread state explicit and consistent. +2. Split request processing into small units with clear responsibilities. +3. Reduce duplication between Redis-backed and in-memory parser control paths. +4. Make scheduler shutdown decisions depend on a small runtime status interface instead of scattered implementation details. +5. Preserve existing user-facing behavior, settings, Redis key formats, retry semantics, and callback semantics. +6. Add lightweight tests that do not require Redis, MySQL, browsers, or network access. + +## Non-Goals + +- Do not change the public constructor signatures of `Scheduler`, `AirSpider`, `Spider`, `TaskSpider`, or `BatchSpider`. +- Do not rewrite the storage format for requests, responses, failed items, or failed requests in this phase. +- Do not require Redis or MySQL for the new unit tests. +- Do not remove existing settings or documented behavior. +- Do not perform the spider inheritance refactor in this phase. + +## Proposed Architecture + +### Runtime Worker State + +Introduce a small shared lifecycle helper for runtime threads. The helper should expose: + +- `request_stop()` +- `is_stop_requested` +- `mark_busy()` +- `mark_idle()` +- `is_idle` + +`Collector`, `ParserControl`, `RequestBuffer`, and `ItemBuffer` can keep inheriting from `threading.Thread`, but their public state checks should move toward explicit methods: + +- `is_idle()` +- `pending_count()` +- `is_stopped()` where useful + +This allows `Scheduler` to ask each component for status instead of reading several unrelated flags and queues. + +### Runtime Status Interface + +Add a small internal status contract used by `Scheduler.all_thread_is_done()`: + +- Collector is done when it is idle and has no pending local or backend requests. +- Parser controls are done when each worker is idle. +- Request buffer is done when it has no queued writes, no queued deletions, and is not flushing. +- Item buffer is done when it has no queued items and is not flushing. + +The scheduler should keep the current three-pass stability check, but delegate each component check to the component itself. + +### Parser Request Processing + +Split `ParserControl.deal_request` into private units: + +- `_find_parser(request)` +- `_prepare_response(parser, request)` +- `_run_callback(parser, request, response)` +- `_dispatch_results(parser, request, results, request_redis)` +- `_handle_exception(parser, request, response, error, request_redis, used_download_midware)` +- `_finish_request(request_redis, finish_action)` +- `_sleep_after_request()` + +The method `deal_request` remains as the public entry point for compatibility, but becomes orchestration code. This makes retry and success paths easier to test independently. + +### Result Dispatcher + +Introduce an internal result dispatcher for `Request`, `Item`, and callable values. It should encode the current behavior: + +- `Request` values receive a default `parser_name`. +- Synchronous requests are processed immediately. +- Asynchronous requests go to `RequestBuffer`. +- `Item` and `UpdateItem` values go to `ItemBuffer`. +- Callable values follow the current "previous result type" rule: after an item they are item callbacks; otherwise they are request callbacks. +- Invalid non-`None` values raise `TypeError` with the parser and callback name. + +For the first phase, wire this dispatcher into `ParserControl`. Later phases can reuse it from `Scheduler`, `Spider`, `TaskSpider`, and `BatchSpider`. + +### ParserControl Variants + +Keep both `ParserControl` and `AirSpiderParserControl` classes for compatibility, but move shared behavior into a base implementation. The variants should differ only in: + +- How they receive work. +- How they mark a request complete. +- Whether failed requests are persisted to Redis. + +This avoids changing `AirSpider` behavior while removing duplicated parsing, retry, middleware, and result handling logic. + +### Buffer Semantics + +`RequestBuffer` should make these states explicit: + +- queued new requests +- queued deletion requests +- currently flushing + +`ItemBuffer` should make these states explicit: + +- queued items or callbacks +- currently flushing +- export retry counters + +Both buffers should keep `flush()` as a synchronous method. Their thread loop continues calling `flush()` periodically. `stop()` should request shutdown, and the scheduler should still call explicit flushing before treating the runtime as complete. + +### Error Handling + +The refactor must preserve current error behavior: + +- Request download exceptions continue to increment download exception metrics. +- Parser exceptions continue to increment parser exception metrics. +- Proxy deletion behavior remains based on `PROXY_MAX_FAILED_TIMES`. +- `exception_request` and `failed_request` compatibility hooks remain intact. +- Browser instances are returned to the render downloader in a `finally` block. +- The original request is preserved when download middleware mutates a request and a retry or failed-request persistence is needed. + +Error handling should become easier to read by returning a small internal result from `_handle_exception`, such as whether to delete the active Redis request via the request buffer or via the item buffer. + +## Data Flow + +1. `Scheduler` starts buffers, collector, parser workers, retry handlers, and optional start request distribution. +2. `Collector` atomically reserves ready Redis requests and exposes them through its local queue. +3. `ParserControl` obtains one request, marks itself busy, finds the parser, downloads or reuses a response, validates it, runs the callback, and dispatches yielded results. +4. `RequestBuffer` persists new requests and deletion markers. +5. `ItemBuffer` persists items and deletes completed Redis requests after item persistence succeeds. +6. `Scheduler` polls component status. It ends the spider only after all components report stable idle status. + +For `AirSpider`, the same parser-processing flow applies, but the request source is `MemoryDB` and request completion does not use Redis deletion markers. + +## Compatibility Strategy + +- Keep class names and imports stable. +- Keep public method names such as `deal_request`, `flush`, `stop`, `is_not_task`, `get_requests_count`, and `is_adding_to_db`. +- Add new internal methods instead of removing old methods immediately. +- Preserve existing Redis key names and serialized values. +- Preserve callback ordering and the current callable routing rule. +- Preserve warning, logging, and metric names unless a test proves an existing message is wrong. + +## Testing Strategy + +Add focused unit tests around the new internal boundaries: + +- Collector idle and pending status with a fake RedisDB. +- RequestBuffer status before enqueue, during flush, and after flush with a fake DB. +- ItemBuffer status before enqueue, during flush, and after successful fake pipeline export. +- ParserControl result dispatch for `Request`, synchronous `Request`, `Item`, callback, and invalid result. +- ParserControl retry behavior for a controlled exception without network access. +- Scheduler completion check using fake components that transition from busy to idle. + +Existing integration tests can remain as broader coverage. This phase should not require Redis, MySQL, Playwright, Selenium, or live HTTP. + +## Implementation Phases + +### Phase 1: Status Surfaces + +Add explicit idle and pending methods to collector and buffers. Update `Scheduler.all_thread_is_done()` to use these methods while preserving the three-pass stability check. + +### Phase 2: ParserControl Extraction + +Split `ParserControl.deal_request` into smaller private methods without changing behavior. Add tests for the extracted methods using fake parsers, fake buffers, and fake requests. + +### Phase 3: Shared Parser Runtime + +Move duplicated logic from `ParserControl` and `AirSpiderParserControl` into shared helpers or a shared base class. Keep the concrete classes as compatibility wrappers. + +### Phase 4: Runtime Result Dispatcher + +Introduce the internal dispatcher and wire it into parser controls. Keep legacy result behavior intact. + +### Phase 5: Buffer Stop and Flush Hardening + +Make stop behavior explicit. Ensure a stopped buffer can report pending work accurately and that final flushes are deterministic. + +## Acceptance Criteria + +- Public spider examples in the docs continue to run with the same API. +- `Scheduler.all_thread_is_done()` no longer reads buffer and parser internals directly when an explicit status method exists. +- `ParserControl.deal_request` is substantially shorter and delegates download, callback, dispatch, exception, and completion responsibilities. +- `ParserControl` and `AirSpiderParserControl` share request-processing behavior instead of duplicating it. +- New lightweight tests cover the status and dispatch behavior without external services. +- Existing compatible tests still pass in the available local environment. + +## Follow-Up Work + +After this runtime refactor, a separate design should address the spider type hierarchy. The likely direction is a shared `BaseSpider` plus a distributed branch: + +- `BaseSpider -> AirSpider` +- `BaseSpider -> Spider -> TaskSpider -> BatchSpider` + +That work should wait until the runtime path has clearer component boundaries and tests, because changing inheritance before stabilizing runtime behavior would make regressions harder to isolate. From 85995399df0ac44d2f309ceb63c45038093549b9 Mon Sep 17 00:00:00 2001 From: Boris <564773807@qq.com> Date: Mon, 1 Jun 2026 09:40:00 +0800 Subject: [PATCH 3/8] refactor: add runtime state helper --- feapder/core/runtime_state.py | 48 ++++++++++++++++++++++++++++++ tests/test_runtime_state.py | 56 +++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 feapder/core/runtime_state.py create mode 100644 tests/test_runtime_state.py diff --git a/feapder/core/runtime_state.py b/feapder/core/runtime_state.py new file mode 100644 index 00000000..a8cac210 --- /dev/null +++ b/feapder/core/runtime_state.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +""" +Small runtime state helper for background runtime threads. +""" +from contextlib import contextmanager +from threading import RLock + + +class RuntimeState: + def __init__(self): + self._stop_requested = False + self._busy_count = 0 + self._lock = RLock() + + def request_stop(self): + with self._lock: + self._stop_requested = True + + @property + def is_stop_requested(self): + with self._lock: + return self._stop_requested + + def mark_busy(self): + with self._lock: + self._busy_count += 1 + + def mark_idle(self): + with self._lock: + if self._busy_count > 0: + self._busy_count -= 1 + + @property + def busy_count(self): + with self._lock: + return self._busy_count + + @property + def is_idle(self): + return self.busy_count == 0 + + @contextmanager + def busy(self): + self.mark_busy() + try: + yield + finally: + self.mark_idle() diff --git a/tests/test_runtime_state.py b/tests/test_runtime_state.py new file mode 100644 index 00000000..0c8a9bc0 --- /dev/null +++ b/tests/test_runtime_state.py @@ -0,0 +1,56 @@ +from feapder.core.runtime_state import RuntimeState + + +def test_runtime_state_starts_idle_and_running(): + state = RuntimeState() + + assert state.is_idle is True + assert state.is_stop_requested is False + assert state.busy_count == 0 + + +def test_runtime_state_tracks_busy_count(): + state = RuntimeState() + + state.mark_busy() + state.mark_busy() + + assert state.is_idle is False + assert state.busy_count == 2 + + state.mark_idle() + state.mark_idle() + + assert state.is_idle is True + assert state.busy_count == 0 + + +def test_runtime_state_does_not_go_negative(): + state = RuntimeState() + + state.mark_idle() + + assert state.busy_count == 0 + assert state.is_idle is True + + +def test_runtime_state_busy_context_resets_on_exception(): + state = RuntimeState() + + try: + with state.busy(): + assert state.is_idle is False + raise RuntimeError("boom") + except RuntimeError: + pass + + assert state.is_idle is True + + +def test_runtime_state_stop_request_is_sticky(): + state = RuntimeState() + + state.request_stop() + state.request_stop() + + assert state.is_stop_requested is True From 03f3c9b623a2acb82d6bf8800e41bf5cf8542f32 Mon Sep 17 00:00:00 2001 From: Boris <564773807@qq.com> Date: Mon, 1 Jun 2026 09:54:35 +0800 Subject: [PATCH 4/8] refactor: expose runtime idle status --- feapder/buffer/item_buffer.py | 15 ++- feapder/buffer/request_buffer.py | 118 ++++++++++-------- feapder/core/collector.py | 18 ++- feapder/core/parser_control.py | 23 +++- feapder/core/scheduler.py | 23 +--- feapder/core/spiders/air_spider.py | 13 +- tests/test_runtime_status.py | 187 +++++++++++++++++++++++++++++ 7 files changed, 312 insertions(+), 85 deletions(-) create mode 100644 tests/test_runtime_status.py diff --git a/feapder/buffer/item_buffer.py b/feapder/buffer/item_buffer.py index 35f9bb01..f333ba07 100644 --- a/feapder/buffer/item_buffer.py +++ b/feapder/buffer/item_buffer.py @@ -14,6 +14,7 @@ import feapder.utils.tools as tools from feapder import setting from feapder.db.redisdb import RedisDB +from feapder.core.runtime_state import RuntimeState from feapder.dedup import Dedup from feapder.network.item import Item, UpdateItem from feapder.pipelines import BasePipeline @@ -31,6 +32,7 @@ class ItemBuffer(threading.Thread): def __init__(self, redis_key, task_table=None): if not hasattr(self, "_table_item"): super(ItemBuffer, self).__init__() + self._state = RuntimeState() self._thread_stop = False self._is_adding_to_db = False @@ -106,7 +108,8 @@ def mysql_pipeline(self): def run(self): self._thread_stop = False - while not self._thread_stop: + self._state = RuntimeState() + while not self._state.is_stop_requested: self.flush() tools.delay_time(setting.ITEM_UPLOAD_INTERVAL) @@ -114,6 +117,7 @@ def run(self): def stop(self): self._thread_stop = True + self._state.request_stop() self._started.clear() def put_item(self, item): @@ -174,6 +178,15 @@ def flush(self): def get_items_count(self): return self._items_queue.qsize() + def pending_count(self): + return self.get_items_count() + + def is_idle(self): + return self.pending_count() == 0 and not self.is_adding_to_db() + + def is_stopped(self): + return self._state.is_stop_requested + def is_adding_to_db(self): return self._is_adding_to_db diff --git a/feapder/buffer/request_buffer.py b/feapder/buffer/request_buffer.py index 70677a94..f0c7be17 100644 --- a/feapder/buffer/request_buffer.py +++ b/feapder/buffer/request_buffer.py @@ -15,6 +15,7 @@ import feapder.utils.tools as tools from feapder.db.memorydb import MemoryDB from feapder.db.redisdb import RedisDB +from feapder.core.runtime_state import RuntimeState from feapder.dedup import Dedup from feapder.utils.log import log @@ -60,6 +61,7 @@ class RequestBuffer(AirSpiderRequestBuffer, threading.Thread): def __init__(self, redis_key): AirSpiderRequestBuffer.__init__(self, db=RedisDB(), dedup_name=redis_key) threading.Thread.__init__(self) + self._state = RuntimeState() self._thread_stop = False self._is_adding_to_db = False @@ -74,7 +76,8 @@ def __init__(self, redis_key): def run(self): self._thread_stop = False - while not self._thread_stop: + self._state = RuntimeState() + while not self._state.is_stop_requested: try: self.__add_request_to_db() except Exception as e: @@ -84,6 +87,7 @@ def run(self): def stop(self): self._thread_stop = True + self._state.request_stop() self._started.clear() def put_request(self, request): @@ -113,62 +117,74 @@ def flush(self): def get_requests_count(self): return len(self._requests_deque) + def get_delete_requests_count(self): + return len(self._del_requests_deque) + + def pending_count(self): + return self.get_requests_count() + self.get_delete_requests_count() + + def is_idle(self): + return self.pending_count() == 0 and not self.is_adding_to_db() + + def is_stopped(self): + return self._state.is_stop_requested + def is_adding_to_db(self): return self._is_adding_to_db def __add_request_to_db(self): + self._is_adding_to_db = True request_list = [] prioritys = [] callbacks = [] - - while self._requests_deque: - request = self._requests_deque.popleft() - self._is_adding_to_db = True - - if callable(request): - # 函数 - # 注意:应该考虑闭包情况。闭包情况可写成 - # def test(xxx = xxx): - # # TODO 业务逻辑 使用 xxx - # 这么写不会导致xxx为循环结束后的最后一个值 - callbacks.append(request) - continue - - priority = request.priority - - # 如果需要去重并且库中已重复 则continue - if self.is_exist_request(request): - continue - else: - request_list.append(str(request.to_dict)) - prioritys.append(priority) - - if len(request_list) > MAX_URL_COUNT: + try: + while self._requests_deque: + request = self._requests_deque.popleft() + + if callable(request): + # 函数 + # 注意:应该考虑闭包情况。闭包情况可写成 + # def test(xxx = xxx): + # # TODO 业务逻辑 使用 xxx + # 这么写不会导致xxx为循环结束后的最后一个值 + callbacks.append(request) + continue + + priority = request.priority + + # 如果需要去重并且库中已重复 则continue + if self.is_exist_request(request): + continue + else: + request_list.append(str(request.to_dict)) + prioritys.append(priority) + + if len(request_list) > MAX_URL_COUNT: + self._db.zadd(self._table_request, request_list, prioritys) + request_list = [] + prioritys = [] + + # 入库 + if request_list: self._db.zadd(self._table_request, request_list, prioritys) - request_list = [] - prioritys = [] - - # 入库 - if request_list: - self._db.zadd(self._table_request, request_list, prioritys) - - # 执行回调 - for callback in callbacks: - try: - callback() - except Exception as e: - log.exception(e) - - # 删除已做任务 - if self._del_requests_deque: - request_done_list = [] - while self._del_requests_deque: - request_done_list.append(self._del_requests_deque.popleft()) - - # 去掉request_list中的requests, 否则可能会将刚添加的request删除 - request_done_list = list(set(request_done_list) - set(request_list)) - if request_done_list: - self._db.zrem(self._table_request, request_done_list) - - self._is_adding_to_db = False + # 执行回调 + for callback in callbacks: + try: + callback() + except Exception as e: + log.exception(e) + + # 删除已做任务 + if self._del_requests_deque: + request_done_list = [] + while self._del_requests_deque: + request_done_list.append(self._del_requests_deque.popleft()) + + # 去掉request_list中的requests, 否则可能会将刚添加的request删除 + request_done_list = list(set(request_done_list) - set(request_list)) + + if request_done_list: + self._db.zrem(self._table_request, request_done_list) + finally: + self._is_adding_to_db = False diff --git a/feapder/core/collector.py b/feapder/core/collector.py index 5b8ff652..2f9f2822 100644 --- a/feapder/core/collector.py +++ b/feapder/core/collector.py @@ -15,6 +15,7 @@ import feapder.setting as setting import feapder.utils.tools as tools from feapder.db.redisdb import RedisDB +from feapder.core.runtime_state import RuntimeState from feapder.network.request import Request from feapder.utils.log import log @@ -30,6 +31,7 @@ def __init__(self, redis_key): """ super(Collector, self).__init__() + self._state = RuntimeState() self._db = RedisDB() self._thread_stop = False @@ -40,9 +42,11 @@ def __init__(self, redis_key): def run(self): self._thread_stop = False - while not self._thread_stop: + self._state = RuntimeState() + while not self._state.is_stop_requested: try: - self.__input_data() + with self._state.busy(): + self.__input_data() except Exception as e: log.exception(e) time.sleep(0.1) @@ -51,6 +55,7 @@ def run(self): def stop(self): self._thread_stop = True + self._state.request_stop() self._started.clear() def __input_data(self): @@ -112,5 +117,14 @@ def get_requests_count(self): self._todo_requests.qsize() or self._db.zget_count(self._tab_requests) or 0 ) + def pending_count(self): + return self.get_requests_count() + + def is_idle(self): + return not self.is_collector_task() and self.pending_count() == 0 + + def is_stopped(self): + return self._state.is_stop_requested + def is_collector_task(self): return self._is_collector_task diff --git a/feapder/core/parser_control.py b/feapder/core/parser_control.py index 021d2956..da2bfaf2 100644 --- a/feapder/core/parser_control.py +++ b/feapder/core/parser_control.py @@ -18,6 +18,7 @@ from feapder.buffer.item_buffer import ItemBuffer from feapder.buffer.request_buffer import AirSpiderRequestBuffer from feapder.core.base_parser import BaseParser +from feapder.core.runtime_state import RuntimeState from feapder.db.memorydb import MemoryDB from feapder.network.item import Item from feapder.network.request import Request @@ -42,6 +43,7 @@ class ParserControl(threading.Thread): def __init__(self, collector, redis_key, request_buffer, item_buffer): super(ParserControl, self).__init__() + self._state = RuntimeState() self._parsers = [] self._collector = collector self._redis_key = redis_key @@ -52,7 +54,8 @@ def __init__(self, collector, redis_key, request_buffer, item_buffer): def run(self): self._thread_stop = False - while not self._thread_stop: + self._state = RuntimeState() + while not self._state.is_stop_requested: try: request = self._collector.get_request() if not request: @@ -62,7 +65,8 @@ def run(self): continue self.is_show_tip = False - self.deal_request(request) + with self._state.busy(): + self.deal_request(request) except Exception as e: log.exception(e) @@ -70,6 +74,12 @@ def run(self): def is_not_task(self): return self.is_show_tip + def is_idle(self): + return self.is_not_task() and self._state.is_idle + + def is_stopped(self): + return self._state.is_stop_requested + @classmethod def get_task_status_count(cls): return cls._failed_task_count, cls._success_task_count, cls._total_task_count @@ -431,6 +441,7 @@ def record_download_status(self, status, spider): def stop(self): self._thread_stop = True + self._state.request_stop() self._started.clear() def add_parser(self, parser: BaseParser): @@ -467,6 +478,7 @@ def __init__( item_buffer: ItemBuffer, ): super(ParserControl, self).__init__() + self._state = RuntimeState() self._parsers = [] self._memory_db = memory_db self._thread_stop = False @@ -474,7 +486,9 @@ def __init__( self._item_buffer = item_buffer def run(self): - while not self._thread_stop: + self._thread_stop = False + self._state = RuntimeState() + while not self._state.is_stop_requested: try: request = self._memory_db.get() if not request: @@ -484,7 +498,8 @@ def run(self): continue self.is_show_tip = False - self.deal_request(request) + with self._state.busy(): + self.deal_request(request) except Exception as e: log.exception(e) diff --git a/feapder/core/scheduler.py b/feapder/core/scheduler.py index 0177d185..7fb1fcc0 100644 --- a/feapder/core/scheduler.py +++ b/feapder/core/scheduler.py @@ -286,32 +286,19 @@ def _start(self): self.__add_task() def all_thread_is_done(self): - # 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性 + # Check three times to avoid transient idle states between runtime stages. for i in range(3): - # 检测 collector 状态 - if ( - self._collector.is_collector_task() - or self._collector.get_requests_count() > 0 - ): + if not self._collector.is_idle(): return False - # 检测 parser_control 状态 for parser_control in self._parser_controls: - if not parser_control.is_not_task(): + if not parser_control.is_idle(): return False - # 检测 item_buffer 状态 - if ( - self._item_buffer.get_items_count() > 0 - or self._item_buffer.is_adding_to_db() - ): + if not self._item_buffer.is_idle(): return False - # 检测 request_buffer 状态 - if ( - self._request_buffer.get_requests_count() > 0 - or self._request_buffer.is_adding_to_db() - ): + if not self._request_buffer.is_idle(): return False tools.delay_time(1) diff --git a/feapder/core/spiders/air_spider.py b/feapder/core/spiders/air_spider.py index 70c30112..f51f8930 100644 --- a/feapder/core/spiders/air_spider.py +++ b/feapder/core/spiders/air_spider.py @@ -57,21 +57,16 @@ def distribute_task(self): self._request_buffer.put_request(request, ignore_max_size=False) def all_thread_is_done(self): - for i in range(3): # 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性 - # 检测 parser_control 状态 + # Check three times to avoid transient idle states between runtime stages. + for i in range(3): for parser_control in self._parser_controls: - if not parser_control.is_not_task(): + if not parser_control.is_idle(): return False - # 检测 任务队列 状态 if not self._memory_db.empty(): return False - # 检测 item_buffer 状态 - if ( - self._item_buffer.get_items_count() > 0 - or self._item_buffer.is_adding_to_db() - ): + if not self._item_buffer.is_idle(): return False tools.delay_time(1) diff --git a/tests/test_runtime_status.py b/tests/test_runtime_status.py new file mode 100644 index 00000000..1cc53e4e --- /dev/null +++ b/tests/test_runtime_status.py @@ -0,0 +1,187 @@ +import collections +from queue import Queue + +from feapder.buffer.item_buffer import ItemBuffer +from feapder.buffer.request_buffer import RequestBuffer +from feapder.core.collector import Collector +from feapder.core.scheduler import Scheduler +from feapder.core.runtime_state import RuntimeState + + +class FakeRedis: + def __init__(self, count=0): + self.count = count + + def zget_count(self, table): + return self.count + + +class FakeDoneComponent: + def __init__(self, idle=True): + self._idle = idle + + def is_idle(self): + return self._idle + + +def build_collector(local_count=0, backend_count=0, busy=False): + collector = object.__new__(Collector) + collector._state = RuntimeState() + collector._db = FakeRedis(backend_count) + collector._tab_requests = "test:z_requests" + collector._todo_requests = Queue() + collector._is_collector_task = busy + for i in range(local_count): + collector._todo_requests.put({"request_obj": i, "request_redis": str(i)}) + return collector + + +def build_request_buffer(request_count=0, delete_count=0, flushing=False): + buffer = object.__new__(RequestBuffer) + buffer._state = RuntimeState() + buffer._requests_deque = collections.deque(range(request_count)) + buffer._del_requests_deque = collections.deque(range(delete_count)) + buffer._is_adding_to_db = flushing + return buffer + + +def build_item_buffer(item_count=0, flushing=False): + buffer = object.__new__(ItemBuffer) + buffer._state = RuntimeState() + buffer._items_queue = Queue() + buffer._is_adding_to_db = flushing + for i in range(item_count): + buffer._items_queue.put(i) + return buffer + + +def test_collector_pending_count_includes_local_queue_first(): + collector = build_collector(local_count=2, backend_count=5) + + assert collector.pending_count() == 2 + assert collector.is_idle() is False + + +def test_collector_pending_count_uses_backend_when_local_empty(): + collector = build_collector(local_count=0, backend_count=3) + + assert collector.pending_count() == 3 + assert collector.is_idle() is False + + +def test_collector_idle_when_not_collecting_and_empty(): + collector = build_collector(local_count=0, backend_count=0, busy=False) + + assert collector.pending_count() == 0 + assert collector.is_idle() is True + + +def test_request_buffer_pending_count_includes_writes_and_deletes(): + buffer = build_request_buffer(request_count=2, delete_count=1) + + assert buffer.pending_count() == 3 + assert buffer.is_idle() is False + + +def test_request_buffer_idle_when_empty_and_not_flushing(): + buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False) + + assert buffer.pending_count() == 0 + assert buffer.is_idle() is True + + +def test_item_buffer_idle_when_queue_empty_and_not_flushing(): + buffer = build_item_buffer(item_count=0, flushing=False) + + assert buffer.pending_count() == 0 + assert buffer.is_idle() is True + + +def test_item_buffer_not_idle_while_flushing(): + buffer = build_item_buffer(item_count=0, flushing=True) + + assert buffer.is_idle() is False + + +def test_scheduler_uses_component_idle_methods(monkeypatch): + monkeypatch.setattr("feapder.core.scheduler.tools.delay_time", lambda seconds: None) + scheduler = object.__new__(Scheduler) + scheduler._collector = FakeDoneComponent(idle=True) + scheduler._parser_controls = [FakeDoneComponent(idle=True)] + scheduler._item_buffer = FakeDoneComponent(idle=True) + scheduler._request_buffer = FakeDoneComponent(idle=True) + + assert scheduler.all_thread_is_done() is True + + +def test_scheduler_waits_for_busy_parser(monkeypatch): + monkeypatch.setattr("feapder.core.scheduler.tools.delay_time", lambda seconds: None) + scheduler = object.__new__(Scheduler) + scheduler._collector = FakeDoneComponent(idle=True) + scheduler._parser_controls = [FakeDoneComponent(idle=False)] + scheduler._item_buffer = FakeDoneComponent(idle=True) + scheduler._request_buffer = FakeDoneComponent(idle=True) + + assert scheduler.all_thread_is_done() is False + + +def test_component_run_resets_stop_state_before_loop(monkeypatch): + collector = build_collector(local_count=0, backend_count=0) + collector._state.request_stop() + calls = {"count": 0} + + def stop_after_one_input(): + calls["count"] += 1 + collector._state.request_stop() + + monkeypatch.setattr(collector, "_Collector__input_data", stop_after_one_input) + + collector.run() + + assert calls["count"] == 1 + assert collector.is_stopped() is True + + +def test_request_buffer_not_idle_during_delete_only_flush(): + class InspectingDB: + def __init__(self, buffer): + self.buffer = buffer + self.idle_during_zrem = None + + def zrem(self, table, values): + self.idle_during_zrem = self.buffer.is_idle() + + buffer = build_request_buffer(request_count=0, delete_count=1, flushing=False) + buffer._table_request = "test:z_requests" + buffer._db = InspectingDB(buffer) + + buffer.flush() + + assert buffer._db.idle_during_zrem is False + assert buffer.is_idle() is True + + +def test_request_buffer_marks_noop_flush_attempt_busy_while_checking_queue(): + class InspectingEmptyDeque: + def __init__(self, buffer): + self.buffer = buffer + self.busy_observations = [] + + def __bool__(self): + self.busy_observations.append(self.buffer.is_adding_to_db()) + return False + + def __len__(self): + return 0 + + def popleft(self): + raise AssertionError("empty queue should not be popped") + + buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False) + buffer._requests_deque = InspectingEmptyDeque(buffer) + + buffer.flush() + + assert buffer._requests_deque.busy_observations + assert all(buffer._requests_deque.busy_observations) + assert buffer.is_idle() is True From 68ca9982c5b0ac3224d9088787a1908bbe69fd04 Mon Sep 17 00:00:00 2001 From: Boris <564773807@qq.com> Date: Mon, 1 Jun 2026 10:20:13 +0800 Subject: [PATCH 5/8] refactor: add parser result dispatcher --- feapder/core/result_dispatcher.py | 85 +++++++++++++++ tests/test_result_dispatcher.py | 169 ++++++++++++++++++++++++++++++ 2 files changed, 254 insertions(+) create mode 100644 feapder/core/result_dispatcher.py create mode 100644 tests/test_result_dispatcher.py diff --git a/feapder/core/result_dispatcher.py b/feapder/core/result_dispatcher.py new file mode 100644 index 00000000..43186cd7 --- /dev/null +++ b/feapder/core/result_dispatcher.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +""" +Parser result routing for runtime parser controls. +""" +from collections.abc import Iterable +from dataclasses import dataclass + +from feapder.network.item import Item +from feapder.network.request import Request + + +@dataclass +class DispatchResult: + del_request_redis_after_item_to_db: bool = False + del_request_redis_after_request_to_db: bool = False + + +class ResultDispatcher: + REQUEST_RESULT = 1 + ITEM_RESULT = 2 + + def __init__( + self, + *, + request_buffer, + item_buffer, + deal_request, + sync_request_factory=None, + allow_callable=True, + ): + self._request_buffer = request_buffer + self._item_buffer = item_buffer + self._deal_request = deal_request + self._sync_request_factory = sync_request_factory or (lambda request: request) + self._allow_callable = allow_callable + + def dispatch(self, parser, request, results): + if results and not isinstance(results, Iterable): + raise Exception( + "%s.%s返回值必须可迭代" % (parser.name, request.callback or "parse") + ) + + dispatch_result = DispatchResult() + result_type = 0 + + for result in results or []: + if isinstance(result, Request): + result_type = self.REQUEST_RESULT + result.parser_name = result.parser_name or parser.name + if result.request_sync: + self._deal_request(self._sync_request_factory(result)) + else: + self._request_buffer.put_request(result) + dispatch_result.del_request_redis_after_request_to_db = True + + elif isinstance(result, Item): + result_type = self.ITEM_RESULT + self._item_buffer.put_item(result) + dispatch_result.del_request_redis_after_item_to_db = True + + elif callable(result) and self._allow_callable: + if result_type == self.ITEM_RESULT: + self._item_buffer.put_item(result) + dispatch_result.del_request_redis_after_item_to_db = True + else: + self._request_buffer.put_request(result) + dispatch_result.del_request_redis_after_request_to_db = True + + elif result is not None: + raise TypeError(self._format_type_error(parser, request, result)) + + return dispatch_result + + def _format_type_error(self, parser, request, result): + callback_name = ( + request.callback + and callable(request.callback) + and getattr(request.callback, "__name__") + or request.callback + ) or "parse" + expected = "Request、Item or callback" if self._allow_callable else "Request or Item" + return ( + f"{parser.name}.{callback_name} result expect {expected}, " + f"bug get type: {type(result)}" + ) diff --git a/tests/test_result_dispatcher.py b/tests/test_result_dispatcher.py new file mode 100644 index 00000000..cd4e29df --- /dev/null +++ b/tests/test_result_dispatcher.py @@ -0,0 +1,169 @@ +import pytest + +from feapder.core.result_dispatcher import ResultDispatcher +from feapder.network.item import Item, UpdateItem +from feapder.network.request import Request + + +class FakeParser: + name = "FakeParser" + + +class FakeRequestBuffer: + def __init__(self): + self.requests = [] + + def put_request(self, request): + self.requests.append(request) + + +class FakeItemBuffer: + def __init__(self): + self.items = [] + + def put_item(self, item): + self.items.append(item) + + +def test_dispatcher_routes_async_request_and_sets_parser_name(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + calls = [] + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=calls.append, + sync_request_factory=lambda request: {"request_obj": request, "request_redis": None}, + ) + next_request = Request("https://example.com") + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com", callback="parse"), + results=[next_request], + ) + + assert next_request.parser_name == "FakeParser" + assert request_buffer.requests == [next_request] + assert item_buffer.items == [] + assert calls == [] + assert result.del_request_redis_after_request_to_db is True + assert result.del_request_redis_after_item_to_db is False + + +def test_dispatcher_routes_sync_request_to_deal_request(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + calls = [] + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=calls.append, + sync_request_factory=lambda request: {"request_obj": request, "request_redis": None}, + ) + next_request = Request("https://example.com", request_sync=True) + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[next_request], + ) + + assert calls == [{"request_obj": next_request, "request_redis": None}] + assert request_buffer.requests == [] + assert result.del_request_redis_after_request_to_db is False + + +def test_dispatcher_routes_item_and_update_item_to_item_buffer(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=lambda request: None, + ) + item = Item(title="one") + update_item = UpdateItem(id=1, title="two") + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[item, update_item], + ) + + assert item_buffer.items == [item, update_item] + assert request_buffer.requests == [] + assert result.del_request_redis_after_item_to_db is True + + +def test_dispatcher_routes_callable_after_item_to_item_buffer(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=lambda request: None, + ) + callback = lambda: None + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[Item(title="one"), callback], + ) + + assert item_buffer.items[-1] is callback + assert request_buffer.requests == [] + assert result.del_request_redis_after_item_to_db is True + + +def test_dispatcher_routes_callable_without_prior_item_to_request_buffer(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=lambda request: None, + ) + callback = lambda: None + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[callback], + ) + + assert request_buffer.requests == [callback] + assert item_buffer.items == [] + assert result.del_request_redis_after_request_to_db is True + + +def test_dispatcher_rejects_callable_when_disabled(): + dispatcher = ResultDispatcher( + request_buffer=FakeRequestBuffer(), + item_buffer=FakeItemBuffer(), + deal_request=lambda request: None, + allow_callable=False, + ) + + with pytest.raises(TypeError, match="FakeParser.parse result expect Request or Item"): + dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[lambda: None], + ) + + +def test_dispatcher_rejects_invalid_result_type(): + dispatcher = ResultDispatcher( + request_buffer=FakeRequestBuffer(), + item_buffer=FakeItemBuffer(), + deal_request=lambda request: None, + ) + + with pytest.raises(TypeError, match="FakeParser.parse result expect Request"): + dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[object()], + ) From 2079e1f7174dfb3b17d6d9597bb7f55713538c48 Mon Sep 17 00:00:00 2001 From: Boris <564773807@qq.com> Date: Mon, 1 Jun 2026 10:32:46 +0800 Subject: [PATCH 6/8] refactor: split parser control request handling --- feapder/core/parser_control.py | 645 ++++++++++++++------------- tests/test_parser_control_runtime.py | 231 ++++++++++ 2 files changed, 558 insertions(+), 318 deletions(-) create mode 100644 tests/test_parser_control_runtime.py diff --git a/feapder/core/parser_control.py b/feapder/core/parser_control.py index da2bfaf2..12b14357 100644 --- a/feapder/core/parser_control.py +++ b/feapder/core/parser_control.py @@ -18,6 +18,7 @@ from feapder.buffer.item_buffer import ItemBuffer from feapder.buffer.request_buffer import AirSpiderRequestBuffer from feapder.core.base_parser import BaseParser +from feapder.core.result_dispatcher import ResultDispatcher from feapder.core.runtime_state import RuntimeState from feapder.db.memorydb import MemoryDB from feapder.network.item import Item @@ -84,352 +85,360 @@ def is_stopped(self): def get_task_status_count(cls): return cls._failed_task_count, cls._success_task_count, cls._total_task_count - def deal_request(self, request): - response = None - request_redis = request["request_redis"] - request = request["request_obj"] - - del_request_redis_after_item_to_db = False - del_request_redis_after_request_to_db = False + def _make_sync_request_payload(self, request): + return {"request_obj": request, "request_redis": None} + def _find_parser(self, request): for parser in self._parsers: if parser.name == request.parser_name: - used_download_midware_enable = False - try: - self.__class__._total_task_count += 1 - # 记录需下载的文档 - self.record_download_status( - ParserControl.DOWNLOAD_TOTAL, parser.name - ) - - # 解析request - if request.auto_request: - request_temp = None - response = None - - # 下载中间件 - if request.download_midware: - if isinstance(request.download_midware, (list, tuple)): - request_temp = request - for download_midware in request.download_midware: - download_midware = ( - download_midware - if callable(download_midware) - else tools.get_method(parser, download_midware) - ) - request_temp = download_midware(request_temp) - else: - download_midware = ( - request.download_midware - if callable(request.download_midware) - else tools.get_method( - parser, request.download_midware - ) - ) - request_temp = download_midware(request) - elif request.download_midware != False: - request_temp = parser.download_midware(request) - - # 请求 - if request_temp: - if ( - isinstance(request_temp, (tuple, list)) - and len(request_temp) == 2 - ): - request_temp, response = request_temp - - if not isinstance(request_temp, Request): - raise Exception( - "download_midware need return a request, but received type: {}".format( - type(request_temp) - ) - ) - used_download_midware_enable = True - if response is None: - response = ( - request_temp.get_response() - if not setting.RESPONSE_CACHED_USED - else request_temp.get_response_from_cached( - save_cached=False - ) - ) - else: - response = ( - request.get_response() - if not setting.RESPONSE_CACHED_USED - else request.get_response_from_cached(save_cached=False) - ) - - if response == None: - raise Exception( - "连接超时 url: %s" % (request.url or request_temp.url) - ) - - # 校验 - if parser.validate(request, response) == False: - break + return parser + return None - else: - response = None + def _prepare_response(self, parser, request): + used_download_midware_enable = False + if not request.auto_request: + return request, None, used_download_midware_enable - if request.callback: # 如果有parser的回调函数,则用回调处理 - callback_parser = ( - request.callback - if callable(request.callback) - else tools.get_method(parser, request.callback) - ) - results = callback_parser(request, response) - else: # 否则默认用parser处理 - results = parser.parse(request, response) - - if results and not isinstance(results, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, request.callback or "parse") - ) + request_temp = None + response = None - # 标识上一个result是什么 - result_type = 0 # 0\1\2 (初始值\request\item) - # 此处判断是request 还是 item - for result in results or []: - if isinstance(result, Request): - result_type = 1 - # 给request的 parser_name 赋值 - result.parser_name = result.parser_name or parser.name + # 下载中间件 + if request.download_midware: + if isinstance(request.download_midware, (list, tuple)): + request_temp = request + for download_midware in request.download_midware: + download_midware = ( + download_midware + if callable(download_midware) + else tools.get_method(parser, download_midware) + ) + request_temp = download_midware(request_temp) + else: + download_midware = ( + request.download_midware + if callable(request.download_midware) + else tools.get_method(parser, request.download_midware) + ) + request_temp = download_midware(request) + elif request.download_midware != False: + request_temp = parser.download_midware(request) + + # 请求 + if request_temp: + if isinstance(request_temp, (tuple, list)) and len(request_temp) == 2: + request_temp, response = request_temp + + if not isinstance(request_temp, Request): + raise Exception( + "download_midware need return a request, but received type: {}".format( + type(request_temp) + ) + ) + used_download_midware_enable = True + request = request_temp + + return request, response, used_download_midware_enable + + def _download_response(self, request, response): + if response is None: + response = ( + request.get_response() + if not setting.RESPONSE_CACHED_USED + else request.get_response_from_cached(save_cached=False) + ) + + if response == None: + raise Exception("连接超时 url: %s" % request.url) + + return response + + def _run_callback(self, parser, request, response): + if request.callback: # 如果有parser的回调函数,则用回调处理 + callback_parser = ( + request.callback + if callable(request.callback) + else tools.get_method(parser, request.callback) + ) + return callback_parser(request, response) + + # 否则默认用parser处理 + return parser.parse(request, response) + + def _dispatch_results(self, parser, request, results): + dispatcher = ResultDispatcher( + request_buffer=self._request_buffer, + item_buffer=self._item_buffer, + deal_request=self.deal_request, + sync_request_factory=self._make_sync_request_payload, + allow_callable=True, + ) + return dispatcher.dispatch(parser, request, results) + + def _finish_request( + self, + request_redis, + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ): + # 删除正在做的request 跟随item优先 + if not request_redis: + return + + if del_request_redis_after_item_to_db: + self._item_buffer.put_item(request_redis) + elif del_request_redis_after_request_to_db: + self._request_buffer.put_del_request(request_redis) + else: + self._request_buffer.put_del_request(request_redis) + + def _record_success(self, parser, request, response): + # 记录下载成功的文档 + self.record_download_status(ParserControl.DOWNLOAD_SUCCESS, parser.name) + # 记录成功任务数 + self.__class__._success_task_count += 1 + + # 缓存下载成功的文档 + if setting.RESPONSE_CACHED_ENABLE: + request.save_cached( + response=response, + expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME, + ) + + def _release_response_browser(self, request, response): + # 释放浏览器 + if response and getattr(response, "browser", None): + request.render_downloader.put_back(response.browser) + + def _sleep_after_request(self): + if setting.SPIDER_SLEEP_TIME: + if ( + isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list)) + and len(setting.SPIDER_SLEEP_TIME) == 2 + ): + sleep_time = random.randint( + int(setting.SPIDER_SLEEP_TIME[0]), int(setting.SPIDER_SLEEP_TIME[1]) + ) + time.sleep(sleep_time) + else: + time.sleep(setting.SPIDER_SLEEP_TIME) - # 判断是同步的callback还是异步的 - if result.request_sync: # 同步 - request_dict = { - "request_obj": result, - "request_redis": None, - } - self.deal_request(request_dict) - else: # 异步 - # 将next_request 入库 - self._request_buffer.put_request(result) - del_request_redis_after_request_to_db = True + def _original_request_after_middleware(self, request_redis, request): + return Request.from_dict(eval(request_redis)) if request_redis else request - elif isinstance(result, Item): - result_type = 2 - # 将item入库 - self._item_buffer.put_item(result) - # 需删除正在做的request - del_request_redis_after_item_to_db = True + def _handle_exception( + self, + parser, + request, + request_redis, + response, + exception, + used_download_midware_enable, + ): + del_request_redis_after_item_to_db = False + del_request_redis_after_request_to_db = False - elif callable(result): # result为可执行的无参函数 - if result_type == 2: # item 的 callback,buffer里的item均入库后再执行 - self._item_buffer.put_item(result) - del_request_redis_after_item_to_db = True + exception_type = str(type(exception)).replace("", "") + if exception_type.startswith("requests"): + # 记录下载失败的文档 + self.record_download_status(ParserControl.DOWNLOAD_EXCEPTION, parser.name) + if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0: + request.del_proxy() + + else: + # 记录解析程序异常 + self.record_download_status(ParserControl.PAESERS_EXCEPTION, parser.name) + + if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多 + log.exception(exception) + + log.error( + """ + -------------- %s.%s error ------------- + error %s + response %s + deal request %s + """ + % ( + parser.name, + ( + request.callback + and callable(request.callback) + and getattr(request.callback, "__name__") + or request.callback + ) + or "parse", + str(exception), + response, + tools.dumps_json(request.to_dict, indent=28) + if setting.LOG_LEVEL == "DEBUG" + else request, + ) + ) - else: # result_type == 1: # request 的 callback,buffer里的request均入库后再执行。可能有的parser直接返回callback - self._request_buffer.put_request(result) - del_request_redis_after_request_to_db = True + request.error_msg = "%s: %s" % (exception_type, exception) + request.response = str(response) - elif result is not None: - function_name = "{}.{}".format( - parser.name, - ( - request.callback - and callable(request.callback) - and getattr(request.callback, "__name__") - or request.callback - ) - or "parse", - ) - raise TypeError( - f"{function_name} result expect Request、Item or callback, bug get type: {type(result)}" - ) + if "Invalid URL" in str(exception): + request.is_abandoned = True - except Exception as e: - exception_type = ( - str(type(e)).replace("", "") - ) - if exception_type.startswith("requests"): - # 记录下载失败的文档 - self.record_download_status( - ParserControl.DOWNLOAD_EXCEPTION, parser.name - ) - if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0: - request.del_proxy() + requests = parser.exception_request(request, response, exception) or [request] + if not isinstance(requests, Iterable): + raise Exception("%s.%s返回值必须可迭代" % (parser.name, "exception_request")) - else: - # 记录解析程序异常 - self.record_download_status( - ParserControl.PAESERS_EXCEPTION, parser.name - ) + for exception_request in requests: + if callable(exception_request): + self._request_buffer.put_request(exception_request) + continue - if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多 - log.exception(e) + if not isinstance(exception_request, Request): + raise Exception("exception_request 需 yield request") - log.error( - """ - -------------- %s.%s error ------------- - error %s - response %s - deal request %s - """ - % ( - parser.name, - ( - request.callback - and callable(request.callback) - and getattr(request.callback, "__name__") - or request.callback - ) - or "parse", - str(e), - response, - tools.dumps_json(request.to_dict, indent=28) - if setting.LOG_LEVEL == "DEBUG" - else request, - ) + if ( + exception_request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES + or exception_request.is_abandoned + ): + self.__class__._failed_task_count += 1 # 记录失败任务数 + + # 处理failed_request的返回值 request 或 func + results = parser.failed_request( + exception_request, response, exception + ) or [exception_request] + if not isinstance(results, Iterable): + raise Exception( + "%s.%s返回值必须可迭代" % (parser.name, "failed_request") ) - request.error_msg = "%s: %s" % (exception_type, e) - request.response = str(response) - - if "Invalid URL" in str(e): - request.is_abandoned = True - - requests = parser.exception_request(request, response, e) or [ - request - ] - if not isinstance(requests, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, "exception_request") - ) - for request in requests: - if callable(request): - self._request_buffer.put_request(request) - continue - - if not isinstance(request, Request): - raise Exception("exception_request 需 yield request") - - if ( - request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES - or request.is_abandoned - ): - self.__class__._failed_task_count += 1 # 记录失败任务数 - - # 处理failed_request的返回值 request 或 func - results = parser.failed_request(request, response, e) or [ - request - ] - if not isinstance(results, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, "failed_request") + for result in results: + if isinstance(result, Request): + if setting.SAVE_FAILED_REQUEST: + if used_download_midware_enable: + # 去掉download_midware 添加的属性 + original_request = self._original_request_after_middleware( + request_redis, result ) + original_request.error_msg = exception_request.error_msg + original_request.response = exception_request.response - for result in results: - if isinstance(result, Request): - if setting.SAVE_FAILED_REQUEST: - if used_download_midware_enable: - # 去掉download_midware 添加的属性 - original_request = ( - Request.from_dict(eval(request_redis)) - if request_redis - else result - ) - original_request.error_msg = ( - request.error_msg - ) - original_request.response = request.response - - self._request_buffer.put_failed_request( - original_request - ) - else: - self._request_buffer.put_failed_request( - result - ) - - elif callable(result): - self._request_buffer.put_request(result) - - elif isinstance(result, Item): - self._item_buffer.put_item(result) - - del_request_redis_after_request_to_db = True - - else: - # 将 requests 重新入库 爬取 - request.retry_times += 1 - request.filter_repeat = False - log.info( - """ - 入库 等待重试 - url %s - 重试次数 %s - 最大允许重试次数 %s""" - % ( - request.url, - request.retry_times, - setting.SPIDER_MAX_RETRY_TIMES, + self._request_buffer.put_failed_request( + original_request ) - ) - if used_download_midware_enable: - # 去掉download_midware 添加的属性 使用原来的requests - original_request = ( - Request.from_dict(eval(request_redis)) - if request_redis - else request - ) - if hasattr(request, "error_msg"): - original_request.error_msg = request.error_msg - if hasattr(request, "response"): - original_request.response = request.response - original_request.retry_times = request.retry_times - original_request.filter_repeat = request.filter_repeat - - self._request_buffer.put_request(original_request) else: - self._request_buffer.put_request(request) - del_request_redis_after_request_to_db = True + self._request_buffer.put_failed_request(result) - else: - # 记录下载成功的文档 - self.record_download_status( - ParserControl.DOWNLOAD_SUCCESS, parser.name - ) - # 记录成功任务数 - self.__class__._success_task_count += 1 + elif callable(result): + self._request_buffer.put_request(result) - # 缓存下载成功的文档 - if setting.RESPONSE_CACHED_ENABLE: - request.save_cached( - response=response, - expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME, - ) + elif isinstance(result, Item): + self._item_buffer.put_item(result) - finally: - # 释放浏览器 - if response and getattr(response, "browser", None): - request.render_downloader.put_back(response.browser) - - break + del_request_redis_after_request_to_db = True - # 删除正在做的request 跟随item优先 - if request_redis: - if del_request_redis_after_item_to_db: - self._item_buffer.put_item(request_redis) + else: + # 将 requests 重新入库 爬取 + exception_request.retry_times += 1 + exception_request.filter_repeat = False + log.info( + """ + 入库 等待重试 + url %s + 重试次数 %s + 最大允许重试次数 %s""" + % ( + exception_request.url, + exception_request.retry_times, + setting.SPIDER_MAX_RETRY_TIMES, + ) + ) + if used_download_midware_enable: + # 去掉download_midware 添加的属性 使用原来的requests + original_request = self._original_request_after_middleware( + request_redis, exception_request + ) + if hasattr(exception_request, "error_msg"): + original_request.error_msg = exception_request.error_msg + if hasattr(exception_request, "response"): + original_request.response = exception_request.response + original_request.retry_times = exception_request.retry_times + original_request.filter_repeat = exception_request.filter_repeat + + self._request_buffer.put_request(original_request) + else: + self._request_buffer.put_request(exception_request) + del_request_redis_after_request_to_db = True - elif del_request_redis_after_request_to_db: - self._request_buffer.put_del_request(request_redis) + return ( + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ) - else: - self._request_buffer.put_del_request(request_redis) + def deal_request(self, request): + response = None + request_redis = request["request_redis"] + request = request["request_obj"] - if setting.SPIDER_SLEEP_TIME: - if ( - isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list)) - and len(setting.SPIDER_SLEEP_TIME) == 2 - ): - sleep_time = random.randint( - int(setting.SPIDER_SLEEP_TIME[0]), int(setting.SPIDER_SLEEP_TIME[1]) - ) - time.sleep(sleep_time) - else: - time.sleep(setting.SPIDER_SLEEP_TIME) + del_request_redis_after_item_to_db = False + del_request_redis_after_request_to_db = False + used_download_midware_enable = False + + parser = self._find_parser(request) + if not parser: + self._finish_request( + request_redis, + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ) + self._sleep_after_request() + return + + try: + self.__class__._total_task_count += 1 + # 记录需下载的文档 + self.record_download_status(ParserControl.DOWNLOAD_TOTAL, parser.name) + + # 解析request + request, response, used_download_midware_enable = self._prepare_response( + parser, request + ) + if request.auto_request: + response = self._download_response(request, response) + + # 校验 + if request.auto_request and parser.validate(request, response) == False: + return + + results = self._run_callback(parser, request, response) + dispatch_result = self._dispatch_results(parser, request, results) + del_request_redis_after_item_to_db = ( + dispatch_result.del_request_redis_after_item_to_db + ) + del_request_redis_after_request_to_db = ( + dispatch_result.del_request_redis_after_request_to_db + ) + + except Exception as e: + ( + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ) = self._handle_exception( + parser, + request, + request_redis, + response, + e, + used_download_midware_enable, + ) + + else: + self._record_success(parser, request, response) + + finally: + self._release_response_browser(request, response) + self._finish_request( + request_redis, + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ) + self._sleep_after_request() def record_download_status(self, status, spider): """ diff --git a/tests/test_parser_control_runtime.py b/tests/test_parser_control_runtime.py new file mode 100644 index 00000000..bf4c3212 --- /dev/null +++ b/tests/test_parser_control_runtime.py @@ -0,0 +1,231 @@ +import pytest + +import feapder.setting as setting +from feapder.core.parser_control import ParserControl +from feapder.network.request import Request +from feapder.network.item import Item + + +class FakeResponse: + browser = None + + def __str__(self): + return "" + + +class FakeParser: + name = "FakeParser" + + def __init__(self): + self.validated = [] + self.parsed = [] + + def download_midware(self, request): + return None + + def validate(self, request, response): + self.validated.append((request, response)) + return True + + def parse(self, request, response): + self.parsed.append((request, response)) + return [Item(title="ok")] + + def exception_request(self, request, response, e): + return [request] + + def failed_request(self, request, response, e): + return [request] + + +class FakeParserRecordingExceptions(FakeParser): + def __init__(self): + super().__init__() + self.exception_requests = [] + + def exception_request(self, request, response, e): + self.exception_requests.append((request, response, e)) + return [request] + + +class FakeRequestBuffer: + def __init__(self): + self.requests = [] + self.deleted = [] + self.failed = [] + + def put_request(self, request): + self.requests.append(request) + + def put_del_request(self, request): + self.deleted.append(request) + + def put_failed_request(self, request): + self.failed.append(request) + + +class FakeItemBuffer: + def __init__(self): + self.items = [] + + def put_item(self, item): + self.items.append(item) + + +def build_control(): + control = object.__new__(ParserControl) + control._parsers = [] + control._request_buffer = FakeRequestBuffer() + control._item_buffer = FakeItemBuffer() + return control + + +def test_find_parser_by_request_parser_name(): + parser = FakeParser() + control = build_control() + control._parsers = [parser] + + assert control._find_parser(Request(parser_name="FakeParser")) is parser + + +def test_find_parser_returns_none_for_missing_parser(): + control = build_control() + control._parsers = [FakeParser()] + + assert control._find_parser(Request(parser_name="OtherParser")) is None + + +def test_run_callback_uses_named_callback(): + parser = FakeParser() + parser.custom = lambda request, response: [Item(title="custom")] + control = build_control() + request = Request(callback="custom") + response = FakeResponse() + + results = control._run_callback(parser, request, response) + + assert len(results) == 1 + assert results[0]["title"] == "custom" + + +def test_finish_request_prefers_item_buffer_for_item_results(): + control = build_control() + + control._finish_request( + request_redis="raw-request", + del_request_redis_after_item_to_db=True, + del_request_redis_after_request_to_db=True, + ) + + assert control._item_buffer.items == ["raw-request"] + assert control._request_buffer.deleted == [] + + +def test_finish_request_deletes_via_request_buffer_for_request_results(): + control = build_control() + + control._finish_request( + request_redis="raw-request", + del_request_redis_after_item_to_db=False, + del_request_redis_after_request_to_db=True, + ) + + assert control._item_buffer.items == [] + assert control._request_buffer.deleted == ["raw-request"] + + +def test_finish_request_deletes_unclaimed_request_by_default(): + control = build_control() + + control._finish_request( + request_redis="raw-request", + del_request_redis_after_item_to_db=False, + del_request_redis_after_request_to_db=False, + ) + + assert control._request_buffer.deleted == ["raw-request"] + + +def test_deal_request_uses_request_returned_by_download_middleware(monkeypatch): + parser = FakeParser() + control = build_control() + control._parsers = [parser] + response = FakeResponse() + replacement_request = Request( + "https://replacement.example.com", + parser_name="FakeParser", + auto_request=True, + ) + + def replace_request(request): + return replacement_request, response + + request = Request( + "https://example.com", + parser_name="FakeParser", + download_midware=replace_request, + ) + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + + control.deal_request({"request_obj": request, "request_redis": "raw-request"}) + + assert parser.validated == [(replacement_request, response)] + assert parser.parsed == [(replacement_request, response)] + + +def test_deal_request_handles_fetch_exception_on_replacement_request(monkeypatch): + parser = FakeParserRecordingExceptions() + control = build_control() + control._parsers = [parser] + replacement_request = Request( + "https://replacement.example.com", + parser_name="FakeParser", + auto_request=True, + ) + + def raise_fetch_error(): + raise RuntimeError("replacement fetch failed") + + replacement_request.get_response = raise_fetch_error + + def replace_request(request): + return replacement_request + + request = Request( + "https://example.com", + parser_name="FakeParser", + download_midware=replace_request, + ) + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + monkeypatch.setattr(setting, "LOG_LEVEL", "INFO") + + control.deal_request({"request_obj": request, "request_redis": None}) + + assert parser.exception_requests + assert parser.exception_requests[0][0] is replacement_request + + +def test_deal_request_dispatches_item_and_marks_request_for_item_delete(monkeypatch): + parser = FakeParser() + control = build_control() + control._parsers = [parser] + response = FakeResponse() + request = Request( + "https://example.com", + parser_name="FakeParser", + auto_request=False, + ) + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + + control.deal_request({"request_obj": request, "request_redis": "raw-request"}) + + assert len(control._item_buffer.items) == 2 + assert isinstance(control._item_buffer.items[0], Item) + assert control._item_buffer.items[1] == "raw-request" + assert control._request_buffer.deleted == [] From 064450a13c44034e291a9ebf8f5c71d7cea74137 Mon Sep 17 00:00:00 2001 From: Boris <564773807@qq.com> Date: Mon, 1 Jun 2026 11:01:18 +0800 Subject: [PATCH 7/8] refactor: share air parser control runtime helpers --- feapder/core/parser_control.py | 405 +++++++++++---------------- tests/test_parser_control_runtime.py | 77 ++++- 2 files changed, 238 insertions(+), 244 deletions(-) diff --git a/feapder/core/parser_control.py b/feapder/core/parser_control.py index 12b14357..646cb2a8 100644 --- a/feapder/core/parser_control.py +++ b/feapder/core/parser_control.py @@ -513,259 +513,178 @@ def run(self): except Exception as e: log.exception(e) - def deal_request(self, request): - response = None + def _make_sync_request_payload(self, request): + return request - for parser in self._parsers: - if parser.name == request.parser_name: - try: - self.__class__._total_task_count += 1 - # 记录需下载的文档 - self.record_download_status( - ParserControl.DOWNLOAD_TOTAL, parser.name - ) + def _dispatch_results(self, parser, request, results): + dispatcher = ResultDispatcher( + request_buffer=self._request_buffer, + item_buffer=self._item_buffer, + deal_request=self.deal_request, + sync_request_factory=self._make_sync_request_payload, + allow_callable=False, + ) + return dispatcher.dispatch(parser, request, results) - # 解析request - if request.auto_request: - request_temp = None - response = None - - # 下载中间件 - if request.download_midware: - if isinstance(request.download_midware, (list, tuple)): - request_temp = request - for download_midware in request.download_midware: - download_midware = ( - download_midware - if callable(download_midware) - else tools.get_method(parser, download_midware) - ) - request_temp = download_midware(request_temp) - else: - download_midware = ( - request.download_midware - if callable(request.download_midware) - else tools.get_method( - parser, request.download_midware - ) - ) - request_temp = download_midware(request) - elif request.download_midware != False: - request_temp = parser.download_midware(request) - - # 请求 - if request_temp: - if ( - isinstance(request_temp, (tuple, list)) - and len(request_temp) == 2 - ): - request_temp, response = request_temp - - if not isinstance(request_temp, Request): - raise Exception( - "download_midware need return a request, but received type: {}".format( - type(request_temp) - ) - ) - request = request_temp - - if response is None: - response = ( - request.get_response() - if not setting.RESPONSE_CACHED_USED - else request.get_response_from_cached(save_cached=False) - ) - - # 校验 - if parser.validate(request, response) == False: - break - - else: - response = None - - if request.callback: # 如果有parser的回调函数,则用回调处理 - callback_parser = ( - request.callback - if callable(request.callback) - else tools.get_method(parser, request.callback) - ) - results = callback_parser(request, response) - else: # 否则默认用parser处理 - results = parser.parse(request, response) - - if results and not isinstance(results, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, request.callback or "parse") - ) - - # 此处判断是request 还是 item - for result in results or []: - if isinstance(result, Request): - # 给request的 parser_name 赋值 - result.parser_name = result.parser_name or parser.name - - # 判断是同步的callback还是异步的 - if result.request_sync: # 同步 - self.deal_request(result) - else: # 异步 - # 将next_request 入库 - self._request_buffer.put_request(result) - - elif isinstance(result, Item): - self._item_buffer.put_item(result) - elif result is not None: - function_name = "{}.{}".format( - parser.name, - ( - request.callback - and callable(request.callback) - and getattr(request.callback, "__name__") - or request.callback - ) - or "parse", - ) - raise TypeError( - f"{function_name} result expect Request or Item, bug get type: {type(result)}" - ) - - except Exception as e: - exception_type = ( - str(type(e)).replace("", "") - ) - if exception_type.startswith("requests"): - # 记录下载失败的文档 - self.record_download_status( - ParserControl.DOWNLOAD_EXCEPTION, parser.name - ) - if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0: - request.del_proxy() - - else: - # 记录解析程序异常 - self.record_download_status( - ParserControl.PAESERS_EXCEPTION, parser.name - ) - - if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多 - log.exception(e) - - log.error( - """ - -------------- %s.%s error ------------- - error %s - response %s - deal request %s - """ - % ( - parser.name, - ( - request.callback - and callable(request.callback) - and getattr(request.callback, "__name__") - or request.callback - ) - or "parse", - str(e), - response, - tools.dumps_json(request.to_dict, indent=28) - if setting.LOG_LEVEL == "DEBUG" - else request, - ) - ) + def _finish_request( + self, + request_redis, + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ): + assert request_redis is None + assert del_request_redis_after_item_to_db is False + assert del_request_redis_after_request_to_db is False + return - request.error_msg = "%s: %s" % (exception_type, e) - request.response = str(response) - - if "Invalid URL" in str(e): - request.is_abandoned = True - - requests = parser.exception_request(request, response, e) or [ - request - ] - if not isinstance(requests, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, "exception_request") - ) - for request in requests: - if not isinstance(request, Request): - raise Exception("exception_request 需 yield request") - - if ( - request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES - or request.is_abandoned - ): - self.__class__._failed_task_count += 1 # 记录失败任务数 - - # 处理failed_request的返回值 request 或 func - results = parser.failed_request(request, response, e) or [ - request - ] - if not isinstance(results, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, "failed_request") - ) + def _download_response(self, request, response): + if response is None: + response = ( + request.get_response() + if not setting.RESPONSE_CACHED_USED + else request.get_response_from_cached(save_cached=False) + ) - log.info( - """ - 任务超过最大重试次数,丢弃 - url %s - 重试次数 %s - 最大允许重试次数 %s""" - % ( - request.url, - request.retry_times, - setting.SPIDER_MAX_RETRY_TIMES, - ) - ) - - else: - # 将 requests 重新入库 爬取 - request.retry_times += 1 - request.filter_repeat = False - log.info( - """ - 入库 等待重试 - url %s - 重试次数 %s - 最大允许重试次数 %s""" - % ( - request.url, - request.retry_times, - setting.SPIDER_MAX_RETRY_TIMES, - ) - ) - self._request_buffer.put_request(request) + return response - else: - # 记录下载成功的文档 - self.record_download_status( - ParserControl.DOWNLOAD_SUCCESS, parser.name - ) - # 记录成功任务数 - self.__class__._success_task_count += 1 + def _handle_air_exception(self, parser, request, response, error): + exception_type = str(type(error)).replace("", "") + if exception_type.startswith("requests"): + # 记录下载失败的文档 + self.record_download_status(ParserControl.DOWNLOAD_EXCEPTION, parser.name) + if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0: + request.del_proxy() - # 缓存下载成功的文档 - if setting.RESPONSE_CACHED_ENABLE: - request.save_cached( - response=response, - expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME, - ) + else: + # 记录解析程序异常 + self.record_download_status(ParserControl.PAESERS_EXCEPTION, parser.name) - finally: - # 释放浏览器 - if response and getattr(response, "browser", None): - request.render_downloader.put_back(response.browser) + if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多 + log.exception(error) - break + log.error( + """ + -------------- %s.%s error ------------- + error %s + response %s + deal request %s + """ + % ( + parser.name, + ( + request.callback + and callable(request.callback) + and getattr(request.callback, "__name__") + or request.callback + ) + or "parse", + str(error), + response, + tools.dumps_json(request.to_dict, indent=28) + if setting.LOG_LEVEL == "DEBUG" + else request, + ) + ) + + request.error_msg = "%s: %s" % (exception_type, error) + request.response = str(response) + + if "Invalid URL" in str(error): + request.is_abandoned = True + + requests = parser.exception_request(request, response, error) or [request] + if not isinstance(requests, Iterable): + raise Exception("%s.%s返回值必须可迭代" % (parser.name, "exception_request")) + + for retry_request in requests: + if not isinstance(retry_request, Request): + raise Exception("exception_request 需 yield request") - if setting.SPIDER_SLEEP_TIME: if ( - isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list)) - and len(setting.SPIDER_SLEEP_TIME) == 2 + retry_request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES + or retry_request.is_abandoned ): - sleep_time = random.randint( - int(setting.SPIDER_SLEEP_TIME[0]), int(setting.SPIDER_SLEEP_TIME[1]) + self.__class__._failed_task_count += 1 # 记录失败任务数 + + # 处理failed_request的返回值 request + results = parser.failed_request(retry_request, response, error) or [ + retry_request + ] + if not isinstance(results, Iterable): + raise Exception( + "%s.%s返回值必须可迭代" % (parser.name, "failed_request") + ) + + log.info( + """ + 任务超过最大重试次数,丢弃 + url %s + 重试次数 %s + 最大允许重试次数 %s""" + % ( + retry_request.url, + retry_request.retry_times, + setting.SPIDER_MAX_RETRY_TIMES, + ) ) - time.sleep(sleep_time) + else: - time.sleep(setting.SPIDER_SLEEP_TIME) + # 将 requests 重新入库 爬取 + retry_request.retry_times += 1 + retry_request.filter_repeat = False + log.info( + """ + 入库 等待重试 + url %s + 重试次数 %s + 最大允许重试次数 %s""" + % ( + retry_request.url, + retry_request.retry_times, + setting.SPIDER_MAX_RETRY_TIMES, + ) + ) + self._request_buffer.put_request(retry_request) + + def deal_request(self, request): + response = None + + parser = self._find_parser(request) + if not parser: + self._sleep_after_request() + return + + try: + self.__class__._total_task_count += 1 + # 记录需下载的文档 + self.record_download_status(ParserControl.DOWNLOAD_TOTAL, parser.name) + + # 解析request + request, response, used_download_midware_enable = self._prepare_response( + parser, request + ) + if request.auto_request: + response = self._download_response(request, response) + + # 校验 + if request.auto_request and parser.validate(request, response) == False: + return + + results = self._run_callback(parser, request, response) + self._dispatch_results(parser, request, results) + + except Exception as e: + self._handle_air_exception( + parser=parser, + request=request, + response=response, + error=e, + ) + + else: + self._record_success(parser, request, response) + + finally: + self._release_response_browser(request, response) + self._finish_request(None, False, False) + self._sleep_after_request() diff --git a/tests/test_parser_control_runtime.py b/tests/test_parser_control_runtime.py index bf4c3212..bf9b76f4 100644 --- a/tests/test_parser_control_runtime.py +++ b/tests/test_parser_control_runtime.py @@ -1,7 +1,7 @@ import pytest import feapder.setting as setting -from feapder.core.parser_control import ParserControl +from feapder.core.parser_control import AirSpiderParserControl, ParserControl from feapder.network.request import Request from feapder.network.item import Item @@ -72,6 +72,10 @@ def put_item(self, item): self.items.append(item) +def return_none_response(): + return None + + def build_control(): control = object.__new__(ParserControl) control._parsers = [] @@ -80,6 +84,14 @@ def build_control(): return control +def build_air_control(): + control = object.__new__(AirSpiderParserControl) + control._parsers = [] + control._request_buffer = FakeRequestBuffer() + control._item_buffer = FakeItemBuffer() + return control + + def test_find_parser_by_request_parser_name(): parser = FakeParser() control = build_control() @@ -229,3 +241,66 @@ def test_deal_request_dispatches_item_and_marks_request_for_item_delete(monkeypa assert isinstance(control._item_buffer.items[0], Item) assert control._item_buffer.items[1] == "raw-request" assert control._request_buffer.deleted == [] + + +def test_air_parser_control_uses_direct_sync_request_payload(): + control = build_air_control() + request = Request("https://example.com") + + assert control._make_sync_request_payload(request) is request + + +def test_air_parser_control_dispatches_item_without_redis_finish(monkeypatch): + parser = FakeParser() + control = build_air_control() + control._parsers = [parser] + request = Request( + "https://example.com", + parser_name="FakeParser", + auto_request=False, + ) + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + + control.deal_request(request) + + assert len(control._item_buffer.items) == 1 + assert isinstance(control._item_buffer.items[0], Item) + assert control._request_buffer.deleted == [] + + +def test_air_parser_control_allows_none_download_response(monkeypatch): + parser = FakeParserRecordingExceptions() + control = build_air_control() + control._parsers = [parser] + request = Request( + "https://example.com", + parser_name="FakeParser", + auto_request=True, + ) + request.get_response = return_none_response + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + + control.deal_request(request) + + assert parser.exception_requests == [] + assert parser.validated == [(request, None)] + assert parser.parsed == [(request, None)] + + +def test_air_parser_control_finish_request_rejects_redis_deletion_inputs(): + control = build_air_control() + + control._finish_request(None, False, False) + + with pytest.raises(AssertionError): + control._finish_request("raw-request", False, False) + + with pytest.raises(AssertionError): + control._finish_request(None, True, False) + + with pytest.raises(AssertionError): + control._finish_request(None, False, True) From 860e53b044f8910a38261622b697df21af5d7190 Mon Sep 17 00:00:00 2001 From: Boris <564773807@qq.com> Date: Mon, 1 Jun 2026 11:16:38 +0800 Subject: [PATCH 8/8] refactor: harden buffer flush state --- feapder/buffer/item_buffer.py | 193 +++++++++++++++++----------------- tests/test_runtime_status.py | 53 ++++++++++ 2 files changed, 150 insertions(+), 96 deletions(-) diff --git a/feapder/buffer/item_buffer.py b/feapder/buffer/item_buffer.py index f333ba07..80e21826 100644 --- a/feapder/buffer/item_buffer.py +++ b/feapder/buffer/item_buffer.py @@ -299,133 +299,134 @@ def __export_to_db(self, table, datas, is_update=False, update_keys=(), used_pip def __add_item_to_db( self, items, update_items, requests, callbacks, items_fingerprints ): - export_success = True self._is_adding_to_db = True + try: + export_success = True - # 去重 - if setting.ITEM_FILTER_ENABLE: - items, items_fingerprints = self.__dedup_items(items, items_fingerprints) + # 去重 + if setting.ITEM_FILTER_ENABLE: + items, items_fingerprints = self.__dedup_items(items, items_fingerprints) - # 分捡(返回值包含 pipelines_dict) - items_dict = self.__pick_items(items) - update_items_dict = self.__pick_items(update_items, is_update_item=True) + # 分捡(返回值包含 pipelines_dict) + items_dict = self.__pick_items(items) + update_items_dict = self.__pick_items(update_items, is_update_item=True) - # item批量入库 - failed_items = {"add": [], "update": [], "requests": []} - while items_dict: - table, datas = items_dict.popitem() - used_pipelines = self._item_pipelines.get(table) + # item批量入库 + failed_items = {"add": [], "update": [], "requests": []} + while items_dict: + table, datas = items_dict.popitem() + used_pipelines = self._item_pipelines.get(table) - log.debug( - """ + log.debug( + """ -------------- item 批量入库 -------------- 表名: %s datas: %s """ - % (table, tools.dumps_json(datas, indent=16)) - ) + % (table, tools.dumps_json(datas, indent=16)) + ) - if not self.__export_to_db(table, datas, used_pipelines=used_pipelines): - export_success = False - failed_items["add"].append({"table": table, "datas": datas}) + if not self.__export_to_db(table, datas, used_pipelines=used_pipelines): + export_success = False + failed_items["add"].append({"table": table, "datas": datas}) - # 执行批量update - while update_items_dict: - table, datas = update_items_dict.popitem() - used_pipelines = self._item_pipelines.get(table) + # 执行批量update + while update_items_dict: + table, datas = update_items_dict.popitem() + used_pipelines = self._item_pipelines.get(table) - log.debug( - """ + log.debug( + """ -------------- item 批量更新 -------------- 表名: %s datas: %s """ - % (table, tools.dumps_json(datas, indent=16)) - ) - - update_keys = self._item_update_keys.get(table) - if not self.__export_to_db( - table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines - ): - export_success = False - failed_items["update"].append( - {"table": table, "datas": datas, "update_keys": update_keys} + % (table, tools.dumps_json(datas, indent=16)) ) - if export_success: - # 执行回调 - while callbacks: - try: - callback = callbacks.pop(0) - callback() - except Exception as e: - log.exception(e) + update_keys = self._item_update_keys.get(table) + if not self.__export_to_db( + table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines + ): + export_success = False + failed_items["update"].append( + {"table": table, "datas": datas, "update_keys": update_keys} + ) - # 删除做过的request - if requests: - self.redis_db.zrem(self._table_request, requests) + if export_success: + # 执行回调 + while callbacks: + try: + callback = callbacks.pop(0) + callback() + except Exception as e: + log.exception(e) - # 去重入库 - if setting.ITEM_FILTER_ENABLE: - if items_fingerprints: - self.__class__.dedup.add(items_fingerprints, skip_check=True) - else: - failed_items["requests"] = requests + # 删除做过的request + if requests: + self.redis_db.zrem(self._table_request, requests) - if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES: - if self._redis_key != "air_spider": - # 失败的item记录到redis - self.redis_db.sadd(self._table_failed_items, failed_items) + # 去重入库 + if setting.ITEM_FILTER_ENABLE: + if items_fingerprints: + self.__class__.dedup.add(items_fingerprints, skip_check=True) + else: + failed_items["requests"] = requests - # 删除做过的request - if requests: - self.redis_db.zrem(self._table_request, requests) + if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES: + if self._redis_key != "air_spider": + # 失败的item记录到redis + self.redis_db.sadd(self._table_failed_items, failed_items) - log.error( - "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format( - tools.dumps_json(failed_items) - ) - ) - self.export_retry_times = 0 + # 删除做过的request + if requests: + self.redis_db.zrem(self._table_request, requests) - else: - tip = ["入库不成功"] - if callbacks: - tip.append("不执行回调") - if requests: - tip.append("不删除任务") - exists = self.redis_db.zexists(self._table_request, requests) - for exist, request in zip(exists, requests): - if exist: - self.redis_db.zadd(self._table_request, requests, 300) + log.error( + "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format( + tools.dumps_json(failed_items) + ) + ) + self.export_retry_times = 0 - if setting.ITEM_FILTER_ENABLE: - tip.append("数据不入去重库") + else: + tip = ["入库不成功"] + if callbacks: + tip.append("不执行回调") + if requests: + tip.append("不删除任务") + exists = self.redis_db.zexists(self._table_request, requests) + for exist, request in zip(exists, requests): + if exist: + self.redis_db.zadd(self._table_request, requests, 300) - if self._redis_key != "air_spider": - tip.append("将自动重试") + if setting.ITEM_FILTER_ENABLE: + tip.append("数据不入去重库") - tip.append("失败items:\n {}".format(tools.dumps_json(failed_items))) - log.error(",".join(tip)) + if self._redis_key != "air_spider": + tip.append("将自动重试") - self.export_falied_times += 1 + tip.append("失败items:\n {}".format(tools.dumps_json(failed_items))) + log.error(",".join(tip)) - if self._redis_key != "air_spider": - self.export_retry_times += 1 + self.export_falied_times += 1 - if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES: - # 报警 - msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format( - self._redis_key, self.export_falied_times - ) - log.error(msg) - tools.send_msg( - msg=msg, - level="error", - message_prefix="《%s》爬虫导出数据失败" % (self._redis_key), - ) + if self._redis_key != "air_spider": + self.export_retry_times += 1 - self._is_adding_to_db = False + if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES: + # 报警 + msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format( + self._redis_key, self.export_falied_times + ) + log.error(msg) + tools.send_msg( + msg=msg, + level="error", + message_prefix="《%s》爬虫导出数据失败" % (self._redis_key), + ) + finally: + self._is_adding_to_db = False def metric_datas(self, table, datas): """ diff --git a/tests/test_runtime_status.py b/tests/test_runtime_status.py index 1cc53e4e..c0701405 100644 --- a/tests/test_runtime_status.py +++ b/tests/test_runtime_status.py @@ -6,6 +6,7 @@ from feapder.core.collector import Collector from feapder.core.scheduler import Scheduler from feapder.core.runtime_state import RuntimeState +from feapder.network.item import Item class FakeRedis: @@ -185,3 +186,55 @@ def popleft(self): assert buffer._requests_deque.busy_observations assert all(buffer._requests_deque.busy_observations) assert buffer.is_idle() is True + + +def test_request_buffer_flush_resets_flag_when_zadd_raises(): + class ExplodingDB: + def zadd(self, table, values, prioritys=0): + raise RuntimeError("write failed") + + request = type( + "FakeRequest", + (), + { + "priority": 300, + "filter_repeat": False, + "url": "https://example.com", + "to_dict": {"url": "https://example.com"}, + }, + )() + buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False) + buffer._db = ExplodingDB() + buffer._table_request = "test:z_requests" + buffer._table_failed_request = "test:z_failed_requests" + buffer._requests_deque.append(request) + + buffer.flush() + + assert buffer.is_adding_to_db() is False + + +def test_item_buffer_flush_resets_flag_when_export_raises(monkeypatch): + monkeypatch.setattr("feapder.buffer.item_buffer.setting.ITEM_FILTER_ENABLE", False) + item = Item(title="boom") + buffer = build_item_buffer(item_count=0, flushing=False) + buffer._items_queue.put(item) + buffer._redis_key = "test" + buffer._task_table = None + buffer._item_tables = {} + buffer._item_update_keys = {} + buffer._item_pipelines = {} + buffer._pipelines = [] + buffer._have_mysql_pipeline = True + buffer._mysql_pipeline = None + buffer.export_retry_times = 0 + buffer.export_falied_times = 0 + + def raise_export(table, datas, is_update=False, update_keys=(), used_pipelines=None): + raise RuntimeError("export failed") + + buffer._ItemBuffer__export_to_db = raise_export + + buffer.flush() + + assert buffer.is_adding_to_db() is False