diff --git a/.gitignore b/.gitignore
index fedead23..721f067a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
files/*
+docs/superpowers/
.DS_Store
.idea/*
*/.idea/*
@@ -15,4 +16,4 @@ dist/
media/
.MWebMetaData/
push.sh
-assets/
\ No newline at end of file
+assets/
diff --git a/README.md b/README.md
index 7bde6250..f13682fc 100644
--- a/README.md
+++ b/README.md
@@ -112,7 +112,7 @@ FirstSpider|2021-02-09 14:55:14,620|air_spider.py|run|line:80|INFO| 无任务,
### Rapidproxy代理
-
+
@@ -133,7 +133,7 @@ FirstSpider|2021-02-09 14:55:14,620|air_spider.py|run|line:80|INFO| 无任务,
### NovProxy
-
+g
diff --git a/docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md b/docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md
new file mode 100644
index 00000000..08b34b6f
--- /dev/null
+++ b/docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md
@@ -0,0 +1,214 @@
+# Core Runtime Refactor Design
+
+Date: 2026-05-31
+
+## Scope
+
+This refactor focuses on the runtime path that moves work through feapder:
+
+- `Scheduler`
+- `Collector`
+- `ParserControl`
+- `RequestBuffer`
+- `ItemBuffer`
+
+The first implementation phase keeps the public spider APIs compatible. It does not change how users subclass `AirSpider`, `Spider`, `TaskSpider`, or `BatchSpider`, and it does not redesign their inheritance yet. The spider inheritance cleanup remains a follow-up phase after the runtime path is safer and better covered by tests.
+
+## Current Problems
+
+The runtime path works, but several responsibilities are tangled:
+
+- Thread lifecycle state is duplicated. Multiple classes maintain `_thread_stop`, `_is_adding_to_db`, `is_show_tip`, queue checks, and direct `_started.clear()` calls in slightly different ways.
+- `ParserControl.deal_request` is too large. It handles parser lookup, download middleware, response fetching, validation, callback execution, result routing, retry, failed request persistence, metrics, browser release, and request deletion in one method.
+- `ParserControl` and `AirSpiderParserControl` duplicate most request handling logic while differing mainly in queue backend and completion semantics.
+- `Scheduler.all_thread_is_done()` depends on implementation details from collector and buffers. This makes shutdown sensitive to timing and temporary state.
+- `RequestBuffer` and `ItemBuffer` both combine queueing, flushing, retry callbacks, persistence, and thread control. Their idle states are not explicit enough for a scheduler to reason about safely.
+- Result handling rules are repeated in several places. A yielded value can be `Request`, `Item`, `UpdateItem`, callable, or invalid, but the classification logic is not centralized.
+- Data safety concerns such as `eval`-based deserialization exist in adjacent runtime code. Replacing serialization is valuable, but it is not part of the first implementation slice unless touched code needs a small compatibility wrapper.
+
+## Goals
+
+1. Make runtime thread state explicit and consistent.
+2. Split request processing into small units with clear responsibilities.
+3. Reduce duplication between Redis-backed and in-memory parser control paths.
+4. Make scheduler shutdown decisions depend on a small runtime status interface instead of scattered implementation details.
+5. Preserve existing user-facing behavior, settings, Redis key formats, retry semantics, and callback semantics.
+6. Add lightweight tests that do not require Redis, MySQL, browsers, or network access.
+
+## Non-Goals
+
+- Do not change the public constructor signatures of `Scheduler`, `AirSpider`, `Spider`, `TaskSpider`, or `BatchSpider`.
+- Do not rewrite the storage format for requests, responses, failed items, or failed requests in this phase.
+- Do not require Redis or MySQL for the new unit tests.
+- Do not remove existing settings or documented behavior.
+- Do not perform the spider inheritance refactor in this phase.
+
+## Proposed Architecture
+
+### Runtime Worker State
+
+Introduce a small shared lifecycle helper for runtime threads. The helper should expose:
+
+- `request_stop()`
+- `is_stop_requested`
+- `mark_busy()`
+- `mark_idle()`
+- `is_idle`
+
+`Collector`, `ParserControl`, `RequestBuffer`, and `ItemBuffer` can keep inheriting from `threading.Thread`, but their public state checks should move toward explicit methods:
+
+- `is_idle()`
+- `pending_count()`
+- `is_stopped()` where useful
+
+This allows `Scheduler` to ask each component for status instead of reading several unrelated flags and queues.
+
+### Runtime Status Interface
+
+Add a small internal status contract used by `Scheduler.all_thread_is_done()`:
+
+- Collector is done when it is idle and has no pending local or backend requests.
+- Parser controls are done when each worker is idle.
+- Request buffer is done when it has no queued writes, no queued deletions, and is not flushing.
+- Item buffer is done when it has no queued items and is not flushing.
+
+The scheduler should keep the current three-pass stability check, but delegate each component check to the component itself.
+
+### Parser Request Processing
+
+Split `ParserControl.deal_request` into private units:
+
+- `_find_parser(request)`
+- `_prepare_response(parser, request)`
+- `_run_callback(parser, request, response)`
+- `_dispatch_results(parser, request, results, request_redis)`
+- `_handle_exception(parser, request, response, error, request_redis, used_download_midware)`
+- `_finish_request(request_redis, finish_action)`
+- `_sleep_after_request()`
+
+The method `deal_request` remains as the public entry point for compatibility, but becomes orchestration code. This makes retry and success paths easier to test independently.
+
+### Result Dispatcher
+
+Introduce an internal result dispatcher for `Request`, `Item`, and callable values. It should encode the current behavior:
+
+- `Request` values receive a default `parser_name`.
+- Synchronous requests are processed immediately.
+- Asynchronous requests go to `RequestBuffer`.
+- `Item` and `UpdateItem` values go to `ItemBuffer`.
+- Callable values follow the current "previous result type" rule: after an item they are item callbacks; otherwise they are request callbacks.
+- Invalid non-`None` values raise `TypeError` with the parser and callback name.
+
+For the first phase, wire this dispatcher into `ParserControl`. Later phases can reuse it from `Scheduler`, `Spider`, `TaskSpider`, and `BatchSpider`.
+
+### ParserControl Variants
+
+Keep both `ParserControl` and `AirSpiderParserControl` classes for compatibility, but move shared behavior into a base implementation. The variants should differ only in:
+
+- How they receive work.
+- How they mark a request complete.
+- Whether failed requests are persisted to Redis.
+
+This avoids changing `AirSpider` behavior while removing duplicated parsing, retry, middleware, and result handling logic.
+
+### Buffer Semantics
+
+`RequestBuffer` should make these states explicit:
+
+- queued new requests
+- queued deletion requests
+- currently flushing
+
+`ItemBuffer` should make these states explicit:
+
+- queued items or callbacks
+- currently flushing
+- export retry counters
+
+Both buffers should keep `flush()` as a synchronous method. Their thread loop continues calling `flush()` periodically. `stop()` should request shutdown, and the scheduler should still call explicit flushing before treating the runtime as complete.
+
+### Error Handling
+
+The refactor must preserve current error behavior:
+
+- Request download exceptions continue to increment download exception metrics.
+- Parser exceptions continue to increment parser exception metrics.
+- Proxy deletion behavior remains based on `PROXY_MAX_FAILED_TIMES`.
+- `exception_request` and `failed_request` compatibility hooks remain intact.
+- Browser instances are returned to the render downloader in a `finally` block.
+- The original request is preserved when download middleware mutates a request and a retry or failed-request persistence is needed.
+
+Error handling should become easier to read by returning a small internal result from `_handle_exception`, such as whether to delete the active Redis request via the request buffer or via the item buffer.
+
+## Data Flow
+
+1. `Scheduler` starts buffers, collector, parser workers, retry handlers, and optional start request distribution.
+2. `Collector` atomically reserves ready Redis requests and exposes them through its local queue.
+3. `ParserControl` obtains one request, marks itself busy, finds the parser, downloads or reuses a response, validates it, runs the callback, and dispatches yielded results.
+4. `RequestBuffer` persists new requests and deletion markers.
+5. `ItemBuffer` persists items and deletes completed Redis requests after item persistence succeeds.
+6. `Scheduler` polls component status. It ends the spider only after all components report stable idle status.
+
+For `AirSpider`, the same parser-processing flow applies, but the request source is `MemoryDB` and request completion does not use Redis deletion markers.
+
+## Compatibility Strategy
+
+- Keep class names and imports stable.
+- Keep public method names such as `deal_request`, `flush`, `stop`, `is_not_task`, `get_requests_count`, and `is_adding_to_db`.
+- Add new internal methods instead of removing old methods immediately.
+- Preserve existing Redis key names and serialized values.
+- Preserve callback ordering and the current callable routing rule.
+- Preserve warning, logging, and metric names unless a test proves an existing message is wrong.
+
+## Testing Strategy
+
+Add focused unit tests around the new internal boundaries:
+
+- Collector idle and pending status with a fake RedisDB.
+- RequestBuffer status before enqueue, during flush, and after flush with a fake DB.
+- ItemBuffer status before enqueue, during flush, and after successful fake pipeline export.
+- ParserControl result dispatch for `Request`, synchronous `Request`, `Item`, callback, and invalid result.
+- ParserControl retry behavior for a controlled exception without network access.
+- Scheduler completion check using fake components that transition from busy to idle.
+
+Existing integration tests can remain as broader coverage. This phase should not require Redis, MySQL, Playwright, Selenium, or live HTTP.
+
+## Implementation Phases
+
+### Phase 1: Status Surfaces
+
+Add explicit idle and pending methods to collector and buffers. Update `Scheduler.all_thread_is_done()` to use these methods while preserving the three-pass stability check.
+
+### Phase 2: ParserControl Extraction
+
+Split `ParserControl.deal_request` into smaller private methods without changing behavior. Add tests for the extracted methods using fake parsers, fake buffers, and fake requests.
+
+### Phase 3: Shared Parser Runtime
+
+Move duplicated logic from `ParserControl` and `AirSpiderParserControl` into shared helpers or a shared base class. Keep the concrete classes as compatibility wrappers.
+
+### Phase 4: Runtime Result Dispatcher
+
+Introduce the internal dispatcher and wire it into parser controls. Keep legacy result behavior intact.
+
+### Phase 5: Buffer Stop and Flush Hardening
+
+Make stop behavior explicit. Ensure a stopped buffer can report pending work accurately and that final flushes are deterministic.
+
+## Acceptance Criteria
+
+- Public spider examples in the docs continue to run with the same API.
+- `Scheduler.all_thread_is_done()` no longer reads buffer and parser internals directly when an explicit status method exists.
+- `ParserControl.deal_request` is substantially shorter and delegates download, callback, dispatch, exception, and completion responsibilities.
+- `ParserControl` and `AirSpiderParserControl` share request-processing behavior instead of duplicating it.
+- New lightweight tests cover the status and dispatch behavior without external services.
+- Existing compatible tests still pass in the available local environment.
+
+## Follow-Up Work
+
+After this runtime refactor, a separate design should address the spider type hierarchy. The likely direction is a shared `BaseSpider` plus a distributed branch:
+
+- `BaseSpider -> AirSpider`
+- `BaseSpider -> Spider -> TaskSpider -> BatchSpider`
+
+That work should wait until the runtime path has clearer component boundaries and tests, because changing inheritance before stabilizing runtime behavior would make regressions harder to isolate.
diff --git a/feapder/buffer/item_buffer.py b/feapder/buffer/item_buffer.py
index 35f9bb01..80e21826 100644
--- a/feapder/buffer/item_buffer.py
+++ b/feapder/buffer/item_buffer.py
@@ -14,6 +14,7 @@
import feapder.utils.tools as tools
from feapder import setting
from feapder.db.redisdb import RedisDB
+from feapder.core.runtime_state import RuntimeState
from feapder.dedup import Dedup
from feapder.network.item import Item, UpdateItem
from feapder.pipelines import BasePipeline
@@ -31,6 +32,7 @@ class ItemBuffer(threading.Thread):
def __init__(self, redis_key, task_table=None):
if not hasattr(self, "_table_item"):
super(ItemBuffer, self).__init__()
+ self._state = RuntimeState()
self._thread_stop = False
self._is_adding_to_db = False
@@ -106,7 +108,8 @@ def mysql_pipeline(self):
def run(self):
self._thread_stop = False
- while not self._thread_stop:
+ self._state = RuntimeState()
+ while not self._state.is_stop_requested:
self.flush()
tools.delay_time(setting.ITEM_UPLOAD_INTERVAL)
@@ -114,6 +117,7 @@ def run(self):
def stop(self):
self._thread_stop = True
+ self._state.request_stop()
self._started.clear()
def put_item(self, item):
@@ -174,6 +178,15 @@ def flush(self):
def get_items_count(self):
return self._items_queue.qsize()
+ def pending_count(self):
+ return self.get_items_count()
+
+ def is_idle(self):
+ return self.pending_count() == 0 and not self.is_adding_to_db()
+
+ def is_stopped(self):
+ return self._state.is_stop_requested
+
def is_adding_to_db(self):
return self._is_adding_to_db
@@ -286,133 +299,134 @@ def __export_to_db(self, table, datas, is_update=False, update_keys=(), used_pip
def __add_item_to_db(
self, items, update_items, requests, callbacks, items_fingerprints
):
- export_success = True
self._is_adding_to_db = True
+ try:
+ export_success = True
- # 去重
- if setting.ITEM_FILTER_ENABLE:
- items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
+ # 去重
+ if setting.ITEM_FILTER_ENABLE:
+ items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
- # 分捡(返回值包含 pipelines_dict)
- items_dict = self.__pick_items(items)
- update_items_dict = self.__pick_items(update_items, is_update_item=True)
+ # 分捡(返回值包含 pipelines_dict)
+ items_dict = self.__pick_items(items)
+ update_items_dict = self.__pick_items(update_items, is_update_item=True)
- # item批量入库
- failed_items = {"add": [], "update": [], "requests": []}
- while items_dict:
- table, datas = items_dict.popitem()
- used_pipelines = self._item_pipelines.get(table)
+ # item批量入库
+ failed_items = {"add": [], "update": [], "requests": []}
+ while items_dict:
+ table, datas = items_dict.popitem()
+ used_pipelines = self._item_pipelines.get(table)
- log.debug(
- """
+ log.debug(
+ """
-------------- item 批量入库 --------------
表名: %s
datas: %s
"""
- % (table, tools.dumps_json(datas, indent=16))
- )
+ % (table, tools.dumps_json(datas, indent=16))
+ )
- if not self.__export_to_db(table, datas, used_pipelines=used_pipelines):
- export_success = False
- failed_items["add"].append({"table": table, "datas": datas})
+ if not self.__export_to_db(table, datas, used_pipelines=used_pipelines):
+ export_success = False
+ failed_items["add"].append({"table": table, "datas": datas})
- # 执行批量update
- while update_items_dict:
- table, datas = update_items_dict.popitem()
- used_pipelines = self._item_pipelines.get(table)
+ # 执行批量update
+ while update_items_dict:
+ table, datas = update_items_dict.popitem()
+ used_pipelines = self._item_pipelines.get(table)
- log.debug(
- """
+ log.debug(
+ """
-------------- item 批量更新 --------------
表名: %s
datas: %s
"""
- % (table, tools.dumps_json(datas, indent=16))
- )
-
- update_keys = self._item_update_keys.get(table)
- if not self.__export_to_db(
- table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines
- ):
- export_success = False
- failed_items["update"].append(
- {"table": table, "datas": datas, "update_keys": update_keys}
+ % (table, tools.dumps_json(datas, indent=16))
)
- if export_success:
- # 执行回调
- while callbacks:
- try:
- callback = callbacks.pop(0)
- callback()
- except Exception as e:
- log.exception(e)
+ update_keys = self._item_update_keys.get(table)
+ if not self.__export_to_db(
+ table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines
+ ):
+ export_success = False
+ failed_items["update"].append(
+ {"table": table, "datas": datas, "update_keys": update_keys}
+ )
+
+ if export_success:
+ # 执行回调
+ while callbacks:
+ try:
+ callback = callbacks.pop(0)
+ callback()
+ except Exception as e:
+ log.exception(e)
- # 删除做过的request
- if requests:
- self.redis_db.zrem(self._table_request, requests)
+ # 删除做过的request
+ if requests:
+ self.redis_db.zrem(self._table_request, requests)
- # 去重入库
- if setting.ITEM_FILTER_ENABLE:
- if items_fingerprints:
- self.__class__.dedup.add(items_fingerprints, skip_check=True)
- else:
- failed_items["requests"] = requests
+ # 去重入库
+ if setting.ITEM_FILTER_ENABLE:
+ if items_fingerprints:
+ self.__class__.dedup.add(items_fingerprints, skip_check=True)
+ else:
+ failed_items["requests"] = requests
- if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
- if self._redis_key != "air_spider":
- # 失败的item记录到redis
- self.redis_db.sadd(self._table_failed_items, failed_items)
+ if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
+ if self._redis_key != "air_spider":
+ # 失败的item记录到redis
+ self.redis_db.sadd(self._table_failed_items, failed_items)
- # 删除做过的request
- if requests:
- self.redis_db.zrem(self._table_request, requests)
+ # 删除做过的request
+ if requests:
+ self.redis_db.zrem(self._table_request, requests)
- log.error(
- "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format(
- tools.dumps_json(failed_items)
+ log.error(
+ "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format(
+ tools.dumps_json(failed_items)
+ )
)
- )
- self.export_retry_times = 0
+ self.export_retry_times = 0
- else:
- tip = ["入库不成功"]
- if callbacks:
- tip.append("不执行回调")
- if requests:
- tip.append("不删除任务")
- exists = self.redis_db.zexists(self._table_request, requests)
- for exist, request in zip(exists, requests):
- if exist:
- self.redis_db.zadd(self._table_request, requests, 300)
-
- if setting.ITEM_FILTER_ENABLE:
- tip.append("数据不入去重库")
+ else:
+ tip = ["入库不成功"]
+ if callbacks:
+ tip.append("不执行回调")
+ if requests:
+ tip.append("不删除任务")
+ exists = self.redis_db.zexists(self._table_request, requests)
+ for exist, request in zip(exists, requests):
+ if exist:
+ self.redis_db.zadd(self._table_request, requests, 300)
- if self._redis_key != "air_spider":
- tip.append("将自动重试")
+ if setting.ITEM_FILTER_ENABLE:
+ tip.append("数据不入去重库")
- tip.append("失败items:\n {}".format(tools.dumps_json(failed_items)))
- log.error(",".join(tip))
+ if self._redis_key != "air_spider":
+ tip.append("将自动重试")
- self.export_falied_times += 1
+ tip.append("失败items:\n {}".format(tools.dumps_json(failed_items)))
+ log.error(",".join(tip))
- if self._redis_key != "air_spider":
- self.export_retry_times += 1
+ self.export_falied_times += 1
- if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
- # 报警
- msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format(
- self._redis_key, self.export_falied_times
- )
- log.error(msg)
- tools.send_msg(
- msg=msg,
- level="error",
- message_prefix="《%s》爬虫导出数据失败" % (self._redis_key),
- )
+ if self._redis_key != "air_spider":
+ self.export_retry_times += 1
- self._is_adding_to_db = False
+ if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
+ # 报警
+ msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format(
+ self._redis_key, self.export_falied_times
+ )
+ log.error(msg)
+ tools.send_msg(
+ msg=msg,
+ level="error",
+ message_prefix="《%s》爬虫导出数据失败" % (self._redis_key),
+ )
+ finally:
+ self._is_adding_to_db = False
def metric_datas(self, table, datas):
"""
diff --git a/feapder/buffer/request_buffer.py b/feapder/buffer/request_buffer.py
index 70677a94..f0c7be17 100644
--- a/feapder/buffer/request_buffer.py
+++ b/feapder/buffer/request_buffer.py
@@ -15,6 +15,7 @@
import feapder.utils.tools as tools
from feapder.db.memorydb import MemoryDB
from feapder.db.redisdb import RedisDB
+from feapder.core.runtime_state import RuntimeState
from feapder.dedup import Dedup
from feapder.utils.log import log
@@ -60,6 +61,7 @@ class RequestBuffer(AirSpiderRequestBuffer, threading.Thread):
def __init__(self, redis_key):
AirSpiderRequestBuffer.__init__(self, db=RedisDB(), dedup_name=redis_key)
threading.Thread.__init__(self)
+ self._state = RuntimeState()
self._thread_stop = False
self._is_adding_to_db = False
@@ -74,7 +76,8 @@ def __init__(self, redis_key):
def run(self):
self._thread_stop = False
- while not self._thread_stop:
+ self._state = RuntimeState()
+ while not self._state.is_stop_requested:
try:
self.__add_request_to_db()
except Exception as e:
@@ -84,6 +87,7 @@ def run(self):
def stop(self):
self._thread_stop = True
+ self._state.request_stop()
self._started.clear()
def put_request(self, request):
@@ -113,62 +117,74 @@ def flush(self):
def get_requests_count(self):
return len(self._requests_deque)
+ def get_delete_requests_count(self):
+ return len(self._del_requests_deque)
+
+ def pending_count(self):
+ return self.get_requests_count() + self.get_delete_requests_count()
+
+ def is_idle(self):
+ return self.pending_count() == 0 and not self.is_adding_to_db()
+
+ def is_stopped(self):
+ return self._state.is_stop_requested
+
def is_adding_to_db(self):
return self._is_adding_to_db
def __add_request_to_db(self):
+ self._is_adding_to_db = True
request_list = []
prioritys = []
callbacks = []
-
- while self._requests_deque:
- request = self._requests_deque.popleft()
- self._is_adding_to_db = True
-
- if callable(request):
- # 函数
- # 注意:应该考虑闭包情况。闭包情况可写成
- # def test(xxx = xxx):
- # # TODO 业务逻辑 使用 xxx
- # 这么写不会导致xxx为循环结束后的最后一个值
- callbacks.append(request)
- continue
-
- priority = request.priority
-
- # 如果需要去重并且库中已重复 则continue
- if self.is_exist_request(request):
- continue
- else:
- request_list.append(str(request.to_dict))
- prioritys.append(priority)
-
- if len(request_list) > MAX_URL_COUNT:
+ try:
+ while self._requests_deque:
+ request = self._requests_deque.popleft()
+
+ if callable(request):
+ # 函数
+ # 注意:应该考虑闭包情况。闭包情况可写成
+ # def test(xxx = xxx):
+ # # TODO 业务逻辑 使用 xxx
+ # 这么写不会导致xxx为循环结束后的最后一个值
+ callbacks.append(request)
+ continue
+
+ priority = request.priority
+
+ # 如果需要去重并且库中已重复 则continue
+ if self.is_exist_request(request):
+ continue
+ else:
+ request_list.append(str(request.to_dict))
+ prioritys.append(priority)
+
+ if len(request_list) > MAX_URL_COUNT:
+ self._db.zadd(self._table_request, request_list, prioritys)
+ request_list = []
+ prioritys = []
+
+ # 入库
+ if request_list:
self._db.zadd(self._table_request, request_list, prioritys)
- request_list = []
- prioritys = []
-
- # 入库
- if request_list:
- self._db.zadd(self._table_request, request_list, prioritys)
-
- # 执行回调
- for callback in callbacks:
- try:
- callback()
- except Exception as e:
- log.exception(e)
-
- # 删除已做任务
- if self._del_requests_deque:
- request_done_list = []
- while self._del_requests_deque:
- request_done_list.append(self._del_requests_deque.popleft())
-
- # 去掉request_list中的requests, 否则可能会将刚添加的request删除
- request_done_list = list(set(request_done_list) - set(request_list))
- if request_done_list:
- self._db.zrem(self._table_request, request_done_list)
-
- self._is_adding_to_db = False
+ # 执行回调
+ for callback in callbacks:
+ try:
+ callback()
+ except Exception as e:
+ log.exception(e)
+
+ # 删除已做任务
+ if self._del_requests_deque:
+ request_done_list = []
+ while self._del_requests_deque:
+ request_done_list.append(self._del_requests_deque.popleft())
+
+ # 去掉request_list中的requests, 否则可能会将刚添加的request删除
+ request_done_list = list(set(request_done_list) - set(request_list))
+
+ if request_done_list:
+ self._db.zrem(self._table_request, request_done_list)
+ finally:
+ self._is_adding_to_db = False
diff --git a/feapder/core/collector.py b/feapder/core/collector.py
index 5b8ff652..2f9f2822 100644
--- a/feapder/core/collector.py
+++ b/feapder/core/collector.py
@@ -15,6 +15,7 @@
import feapder.setting as setting
import feapder.utils.tools as tools
from feapder.db.redisdb import RedisDB
+from feapder.core.runtime_state import RuntimeState
from feapder.network.request import Request
from feapder.utils.log import log
@@ -30,6 +31,7 @@ def __init__(self, redis_key):
"""
super(Collector, self).__init__()
+ self._state = RuntimeState()
self._db = RedisDB()
self._thread_stop = False
@@ -40,9 +42,11 @@ def __init__(self, redis_key):
def run(self):
self._thread_stop = False
- while not self._thread_stop:
+ self._state = RuntimeState()
+ while not self._state.is_stop_requested:
try:
- self.__input_data()
+ with self._state.busy():
+ self.__input_data()
except Exception as e:
log.exception(e)
time.sleep(0.1)
@@ -51,6 +55,7 @@ def run(self):
def stop(self):
self._thread_stop = True
+ self._state.request_stop()
self._started.clear()
def __input_data(self):
@@ -112,5 +117,14 @@ def get_requests_count(self):
self._todo_requests.qsize() or self._db.zget_count(self._tab_requests) or 0
)
+ def pending_count(self):
+ return self.get_requests_count()
+
+ def is_idle(self):
+ return not self.is_collector_task() and self.pending_count() == 0
+
+ def is_stopped(self):
+ return self._state.is_stop_requested
+
def is_collector_task(self):
return self._is_collector_task
diff --git a/feapder/core/parser_control.py b/feapder/core/parser_control.py
index 021d2956..646cb2a8 100644
--- a/feapder/core/parser_control.py
+++ b/feapder/core/parser_control.py
@@ -18,6 +18,8 @@
from feapder.buffer.item_buffer import ItemBuffer
from feapder.buffer.request_buffer import AirSpiderRequestBuffer
from feapder.core.base_parser import BaseParser
+from feapder.core.result_dispatcher import ResultDispatcher
+from feapder.core.runtime_state import RuntimeState
from feapder.db.memorydb import MemoryDB
from feapder.network.item import Item
from feapder.network.request import Request
@@ -42,6 +44,7 @@ class ParserControl(threading.Thread):
def __init__(self, collector, redis_key, request_buffer, item_buffer):
super(ParserControl, self).__init__()
+ self._state = RuntimeState()
self._parsers = []
self._collector = collector
self._redis_key = redis_key
@@ -52,7 +55,8 @@ def __init__(self, collector, redis_key, request_buffer, item_buffer):
def run(self):
self._thread_stop = False
- while not self._thread_stop:
+ self._state = RuntimeState()
+ while not self._state.is_stop_requested:
try:
request = self._collector.get_request()
if not request:
@@ -62,7 +66,8 @@ def run(self):
continue
self.is_show_tip = False
- self.deal_request(request)
+ with self._state.busy():
+ self.deal_request(request)
except Exception as e:
log.exception(e)
@@ -70,345 +75,141 @@ def run(self):
def is_not_task(self):
return self.is_show_tip
+ def is_idle(self):
+ return self.is_not_task() and self._state.is_idle
+
+ def is_stopped(self):
+ return self._state.is_stop_requested
+
@classmethod
def get_task_status_count(cls):
return cls._failed_task_count, cls._success_task_count, cls._total_task_count
- def deal_request(self, request):
- response = None
- request_redis = request["request_redis"]
- request = request["request_obj"]
-
- del_request_redis_after_item_to_db = False
- del_request_redis_after_request_to_db = False
+ def _make_sync_request_payload(self, request):
+ return {"request_obj": request, "request_redis": None}
+ def _find_parser(self, request):
for parser in self._parsers:
if parser.name == request.parser_name:
- used_download_midware_enable = False
- try:
- self.__class__._total_task_count += 1
- # 记录需下载的文档
- self.record_download_status(
- ParserControl.DOWNLOAD_TOTAL, parser.name
- )
-
- # 解析request
- if request.auto_request:
- request_temp = None
- response = None
-
- # 下载中间件
- if request.download_midware:
- if isinstance(request.download_midware, (list, tuple)):
- request_temp = request
- for download_midware in request.download_midware:
- download_midware = (
- download_midware
- if callable(download_midware)
- else tools.get_method(parser, download_midware)
- )
- request_temp = download_midware(request_temp)
- else:
- download_midware = (
- request.download_midware
- if callable(request.download_midware)
- else tools.get_method(
- parser, request.download_midware
- )
- )
- request_temp = download_midware(request)
- elif request.download_midware != False:
- request_temp = parser.download_midware(request)
-
- # 请求
- if request_temp:
- if (
- isinstance(request_temp, (tuple, list))
- and len(request_temp) == 2
- ):
- request_temp, response = request_temp
-
- if not isinstance(request_temp, Request):
- raise Exception(
- "download_midware need return a request, but received type: {}".format(
- type(request_temp)
- )
- )
- used_download_midware_enable = True
- if response is None:
- response = (
- request_temp.get_response()
- if not setting.RESPONSE_CACHED_USED
- else request_temp.get_response_from_cached(
- save_cached=False
- )
- )
- else:
- response = (
- request.get_response()
- if not setting.RESPONSE_CACHED_USED
- else request.get_response_from_cached(save_cached=False)
- )
-
- if response == None:
- raise Exception(
- "连接超时 url: %s" % (request.url or request_temp.url)
- )
-
- # 校验
- if parser.validate(request, response) == False:
- break
-
- else:
- response = None
-
- if request.callback: # 如果有parser的回调函数,则用回调处理
- callback_parser = (
- request.callback
- if callable(request.callback)
- else tools.get_method(parser, request.callback)
- )
- results = callback_parser(request, response)
- else: # 否则默认用parser处理
- results = parser.parse(request, response)
-
- if results and not isinstance(results, Iterable):
- raise Exception(
- "%s.%s返回值必须可迭代" % (parser.name, request.callback or "parse")
- )
-
- # 标识上一个result是什么
- result_type = 0 # 0\1\2 (初始值\request\item)
- # 此处判断是request 还是 item
- for result in results or []:
- if isinstance(result, Request):
- result_type = 1
- # 给request的 parser_name 赋值
- result.parser_name = result.parser_name or parser.name
-
- # 判断是同步的callback还是异步的
- if result.request_sync: # 同步
- request_dict = {
- "request_obj": result,
- "request_redis": None,
- }
- self.deal_request(request_dict)
- else: # 异步
- # 将next_request 入库
- self._request_buffer.put_request(result)
- del_request_redis_after_request_to_db = True
-
- elif isinstance(result, Item):
- result_type = 2
- # 将item入库
- self._item_buffer.put_item(result)
- # 需删除正在做的request
- del_request_redis_after_item_to_db = True
-
- elif callable(result): # result为可执行的无参函数
- if result_type == 2: # item 的 callback,buffer里的item均入库后再执行
- self._item_buffer.put_item(result)
- del_request_redis_after_item_to_db = True
-
- else: # result_type == 1: # request 的 callback,buffer里的request均入库后再执行。可能有的parser直接返回callback
- self._request_buffer.put_request(result)
- del_request_redis_after_request_to_db = True
-
- elif result is not None:
- function_name = "{}.{}".format(
- parser.name,
- (
- request.callback
- and callable(request.callback)
- and getattr(request.callback, "__name__")
- or request.callback
- )
- or "parse",
- )
- raise TypeError(
- f"{function_name} result expect Request、Item or callback, bug get type: {type(result)}"
- )
-
- except Exception as e:
- exception_type = (
- str(type(e)).replace("", "")
- )
- if exception_type.startswith("requests"):
- # 记录下载失败的文档
- self.record_download_status(
- ParserControl.DOWNLOAD_EXCEPTION, parser.name
- )
- if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0:
- request.del_proxy()
-
- else:
- # 记录解析程序异常
- self.record_download_status(
- ParserControl.PAESERS_EXCEPTION, parser.name
- )
-
- if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多
- log.exception(e)
-
- log.error(
- """
- -------------- %s.%s error -------------
- error %s
- response %s
- deal request %s
- """
- % (
- parser.name,
- (
- request.callback
- and callable(request.callback)
- and getattr(request.callback, "__name__")
- or request.callback
- )
- or "parse",
- str(e),
- response,
- tools.dumps_json(request.to_dict, indent=28)
- if setting.LOG_LEVEL == "DEBUG"
- else request,
- )
- )
+ return parser
+ return None
- request.error_msg = "%s: %s" % (exception_type, e)
- request.response = str(response)
-
- if "Invalid URL" in str(e):
- request.is_abandoned = True
-
- requests = parser.exception_request(request, response, e) or [
- request
- ]
- if not isinstance(requests, Iterable):
- raise Exception(
- "%s.%s返回值必须可迭代" % (parser.name, "exception_request")
- )
- for request in requests:
- if callable(request):
- self._request_buffer.put_request(request)
- continue
-
- if not isinstance(request, Request):
- raise Exception("exception_request 需 yield request")
-
- if (
- request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
- or request.is_abandoned
- ):
- self.__class__._failed_task_count += 1 # 记录失败任务数
-
- # 处理failed_request的返回值 request 或 func
- results = parser.failed_request(request, response, e) or [
- request
- ]
- if not isinstance(results, Iterable):
- raise Exception(
- "%s.%s返回值必须可迭代" % (parser.name, "failed_request")
- )
+ def _prepare_response(self, parser, request):
+ used_download_midware_enable = False
+ if not request.auto_request:
+ return request, None, used_download_midware_enable
- for result in results:
- if isinstance(result, Request):
- if setting.SAVE_FAILED_REQUEST:
- if used_download_midware_enable:
- # 去掉download_midware 添加的属性
- original_request = (
- Request.from_dict(eval(request_redis))
- if request_redis
- else result
- )
- original_request.error_msg = (
- request.error_msg
- )
- original_request.response = request.response
-
- self._request_buffer.put_failed_request(
- original_request
- )
- else:
- self._request_buffer.put_failed_request(
- result
- )
-
- elif callable(result):
- self._request_buffer.put_request(result)
-
- elif isinstance(result, Item):
- self._item_buffer.put_item(result)
-
- del_request_redis_after_request_to_db = True
-
- else:
- # 将 requests 重新入库 爬取
- request.retry_times += 1
- request.filter_repeat = False
- log.info(
- """
- 入库 等待重试
- url %s
- 重试次数 %s
- 最大允许重试次数 %s"""
- % (
- request.url,
- request.retry_times,
- setting.SPIDER_MAX_RETRY_TIMES,
- )
- )
- if used_download_midware_enable:
- # 去掉download_midware 添加的属性 使用原来的requests
- original_request = (
- Request.from_dict(eval(request_redis))
- if request_redis
- else request
- )
- if hasattr(request, "error_msg"):
- original_request.error_msg = request.error_msg
- if hasattr(request, "response"):
- original_request.response = request.response
- original_request.retry_times = request.retry_times
- original_request.filter_repeat = request.filter_repeat
-
- self._request_buffer.put_request(original_request)
- else:
- self._request_buffer.put_request(request)
- del_request_redis_after_request_to_db = True
+ request_temp = None
+ response = None
- else:
- # 记录下载成功的文档
- self.record_download_status(
- ParserControl.DOWNLOAD_SUCCESS, parser.name
+ # 下载中间件
+ if request.download_midware:
+ if isinstance(request.download_midware, (list, tuple)):
+ request_temp = request
+ for download_midware in request.download_midware:
+ download_midware = (
+ download_midware
+ if callable(download_midware)
+ else tools.get_method(parser, download_midware)
)
- # 记录成功任务数
- self.__class__._success_task_count += 1
-
- # 缓存下载成功的文档
- if setting.RESPONSE_CACHED_ENABLE:
- request.save_cached(
- response=response,
- expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME,
- )
-
- finally:
- # 释放浏览器
- if response and getattr(response, "browser", None):
- request.render_downloader.put_back(response.browser)
-
- break
-
- # 删除正在做的request 跟随item优先
- if request_redis:
- if del_request_redis_after_item_to_db:
- self._item_buffer.put_item(request_redis)
-
- elif del_request_redis_after_request_to_db:
- self._request_buffer.put_del_request(request_redis)
-
+ request_temp = download_midware(request_temp)
else:
- self._request_buffer.put_del_request(request_redis)
-
+ download_midware = (
+ request.download_midware
+ if callable(request.download_midware)
+ else tools.get_method(parser, request.download_midware)
+ )
+ request_temp = download_midware(request)
+ elif request.download_midware != False:
+ request_temp = parser.download_midware(request)
+
+ # 请求
+ if request_temp:
+ if isinstance(request_temp, (tuple, list)) and len(request_temp) == 2:
+ request_temp, response = request_temp
+
+ if not isinstance(request_temp, Request):
+ raise Exception(
+ "download_midware need return a request, but received type: {}".format(
+ type(request_temp)
+ )
+ )
+ used_download_midware_enable = True
+ request = request_temp
+
+ return request, response, used_download_midware_enable
+
+ def _download_response(self, request, response):
+ if response is None:
+ response = (
+ request.get_response()
+ if not setting.RESPONSE_CACHED_USED
+ else request.get_response_from_cached(save_cached=False)
+ )
+
+ if response == None:
+ raise Exception("连接超时 url: %s" % request.url)
+
+ return response
+
+ def _run_callback(self, parser, request, response):
+ if request.callback: # 如果有parser的回调函数,则用回调处理
+ callback_parser = (
+ request.callback
+ if callable(request.callback)
+ else tools.get_method(parser, request.callback)
+ )
+ return callback_parser(request, response)
+
+ # 否则默认用parser处理
+ return parser.parse(request, response)
+
+ def _dispatch_results(self, parser, request, results):
+ dispatcher = ResultDispatcher(
+ request_buffer=self._request_buffer,
+ item_buffer=self._item_buffer,
+ deal_request=self.deal_request,
+ sync_request_factory=self._make_sync_request_payload,
+ allow_callable=True,
+ )
+ return dispatcher.dispatch(parser, request, results)
+
+ def _finish_request(
+ self,
+ request_redis,
+ del_request_redis_after_item_to_db,
+ del_request_redis_after_request_to_db,
+ ):
+ # 删除正在做的request 跟随item优先
+ if not request_redis:
+ return
+
+ if del_request_redis_after_item_to_db:
+ self._item_buffer.put_item(request_redis)
+ elif del_request_redis_after_request_to_db:
+ self._request_buffer.put_del_request(request_redis)
+ else:
+ self._request_buffer.put_del_request(request_redis)
+
+ def _record_success(self, parser, request, response):
+ # 记录下载成功的文档
+ self.record_download_status(ParserControl.DOWNLOAD_SUCCESS, parser.name)
+ # 记录成功任务数
+ self.__class__._success_task_count += 1
+
+ # 缓存下载成功的文档
+ if setting.RESPONSE_CACHED_ENABLE:
+ request.save_cached(
+ response=response,
+ expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME,
+ )
+
+ def _release_response_browser(self, request, response):
+ # 释放浏览器
+ if response and getattr(response, "browser", None):
+ request.render_downloader.put_back(response.browser)
+
+ def _sleep_after_request(self):
if setting.SPIDER_SLEEP_TIME:
if (
isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list))
@@ -421,6 +222,224 @@ def deal_request(self, request):
else:
time.sleep(setting.SPIDER_SLEEP_TIME)
+ def _original_request_after_middleware(self, request_redis, request):
+ return Request.from_dict(eval(request_redis)) if request_redis else request
+
+ def _handle_exception(
+ self,
+ parser,
+ request,
+ request_redis,
+ response,
+ exception,
+ used_download_midware_enable,
+ ):
+ del_request_redis_after_item_to_db = False
+ del_request_redis_after_request_to_db = False
+
+ exception_type = str(type(exception)).replace("", "")
+ if exception_type.startswith("requests"):
+ # 记录下载失败的文档
+ self.record_download_status(ParserControl.DOWNLOAD_EXCEPTION, parser.name)
+ if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0:
+ request.del_proxy()
+
+ else:
+ # 记录解析程序异常
+ self.record_download_status(ParserControl.PAESERS_EXCEPTION, parser.name)
+
+ if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多
+ log.exception(exception)
+
+ log.error(
+ """
+ -------------- %s.%s error -------------
+ error %s
+ response %s
+ deal request %s
+ """
+ % (
+ parser.name,
+ (
+ request.callback
+ and callable(request.callback)
+ and getattr(request.callback, "__name__")
+ or request.callback
+ )
+ or "parse",
+ str(exception),
+ response,
+ tools.dumps_json(request.to_dict, indent=28)
+ if setting.LOG_LEVEL == "DEBUG"
+ else request,
+ )
+ )
+
+ request.error_msg = "%s: %s" % (exception_type, exception)
+ request.response = str(response)
+
+ if "Invalid URL" in str(exception):
+ request.is_abandoned = True
+
+ requests = parser.exception_request(request, response, exception) or [request]
+ if not isinstance(requests, Iterable):
+ raise Exception("%s.%s返回值必须可迭代" % (parser.name, "exception_request"))
+
+ for exception_request in requests:
+ if callable(exception_request):
+ self._request_buffer.put_request(exception_request)
+ continue
+
+ if not isinstance(exception_request, Request):
+ raise Exception("exception_request 需 yield request")
+
+ if (
+ exception_request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
+ or exception_request.is_abandoned
+ ):
+ self.__class__._failed_task_count += 1 # 记录失败任务数
+
+ # 处理failed_request的返回值 request 或 func
+ results = parser.failed_request(
+ exception_request, response, exception
+ ) or [exception_request]
+ if not isinstance(results, Iterable):
+ raise Exception(
+ "%s.%s返回值必须可迭代" % (parser.name, "failed_request")
+ )
+
+ for result in results:
+ if isinstance(result, Request):
+ if setting.SAVE_FAILED_REQUEST:
+ if used_download_midware_enable:
+ # 去掉download_midware 添加的属性
+ original_request = self._original_request_after_middleware(
+ request_redis, result
+ )
+ original_request.error_msg = exception_request.error_msg
+ original_request.response = exception_request.response
+
+ self._request_buffer.put_failed_request(
+ original_request
+ )
+ else:
+ self._request_buffer.put_failed_request(result)
+
+ elif callable(result):
+ self._request_buffer.put_request(result)
+
+ elif isinstance(result, Item):
+ self._item_buffer.put_item(result)
+
+ del_request_redis_after_request_to_db = True
+
+ else:
+ # 将 requests 重新入库 爬取
+ exception_request.retry_times += 1
+ exception_request.filter_repeat = False
+ log.info(
+ """
+ 入库 等待重试
+ url %s
+ 重试次数 %s
+ 最大允许重试次数 %s"""
+ % (
+ exception_request.url,
+ exception_request.retry_times,
+ setting.SPIDER_MAX_RETRY_TIMES,
+ )
+ )
+ if used_download_midware_enable:
+ # 去掉download_midware 添加的属性 使用原来的requests
+ original_request = self._original_request_after_middleware(
+ request_redis, exception_request
+ )
+ if hasattr(exception_request, "error_msg"):
+ original_request.error_msg = exception_request.error_msg
+ if hasattr(exception_request, "response"):
+ original_request.response = exception_request.response
+ original_request.retry_times = exception_request.retry_times
+ original_request.filter_repeat = exception_request.filter_repeat
+
+ self._request_buffer.put_request(original_request)
+ else:
+ self._request_buffer.put_request(exception_request)
+ del_request_redis_after_request_to_db = True
+
+ return (
+ del_request_redis_after_item_to_db,
+ del_request_redis_after_request_to_db,
+ )
+
+ def deal_request(self, request):
+ response = None
+ request_redis = request["request_redis"]
+ request = request["request_obj"]
+
+ del_request_redis_after_item_to_db = False
+ del_request_redis_after_request_to_db = False
+ used_download_midware_enable = False
+
+ parser = self._find_parser(request)
+ if not parser:
+ self._finish_request(
+ request_redis,
+ del_request_redis_after_item_to_db,
+ del_request_redis_after_request_to_db,
+ )
+ self._sleep_after_request()
+ return
+
+ try:
+ self.__class__._total_task_count += 1
+ # 记录需下载的文档
+ self.record_download_status(ParserControl.DOWNLOAD_TOTAL, parser.name)
+
+ # 解析request
+ request, response, used_download_midware_enable = self._prepare_response(
+ parser, request
+ )
+ if request.auto_request:
+ response = self._download_response(request, response)
+
+ # 校验
+ if request.auto_request and parser.validate(request, response) == False:
+ return
+
+ results = self._run_callback(parser, request, response)
+ dispatch_result = self._dispatch_results(parser, request, results)
+ del_request_redis_after_item_to_db = (
+ dispatch_result.del_request_redis_after_item_to_db
+ )
+ del_request_redis_after_request_to_db = (
+ dispatch_result.del_request_redis_after_request_to_db
+ )
+
+ except Exception as e:
+ (
+ del_request_redis_after_item_to_db,
+ del_request_redis_after_request_to_db,
+ ) = self._handle_exception(
+ parser,
+ request,
+ request_redis,
+ response,
+ e,
+ used_download_midware_enable,
+ )
+
+ else:
+ self._record_success(parser, request, response)
+
+ finally:
+ self._release_response_browser(request, response)
+ self._finish_request(
+ request_redis,
+ del_request_redis_after_item_to_db,
+ del_request_redis_after_request_to_db,
+ )
+ self._sleep_after_request()
+
def record_download_status(self, status, spider):
"""
记录html等文档下载状态
@@ -431,6 +450,7 @@ def record_download_status(self, status, spider):
def stop(self):
self._thread_stop = True
+ self._state.request_stop()
self._started.clear()
def add_parser(self, parser: BaseParser):
@@ -467,6 +487,7 @@ def __init__(
item_buffer: ItemBuffer,
):
super(ParserControl, self).__init__()
+ self._state = RuntimeState()
self._parsers = []
self._memory_db = memory_db
self._thread_stop = False
@@ -474,7 +495,9 @@ def __init__(
self._item_buffer = item_buffer
def run(self):
- while not self._thread_stop:
+ self._thread_stop = False
+ self._state = RuntimeState()
+ while not self._state.is_stop_requested:
try:
request = self._memory_db.get()
if not request:
@@ -484,264 +507,184 @@ def run(self):
continue
self.is_show_tip = False
- self.deal_request(request)
+ with self._state.busy():
+ self.deal_request(request)
except Exception as e:
log.exception(e)
- def deal_request(self, request):
- response = None
+ def _make_sync_request_payload(self, request):
+ return request
- for parser in self._parsers:
- if parser.name == request.parser_name:
- try:
- self.__class__._total_task_count += 1
- # 记录需下载的文档
- self.record_download_status(
- ParserControl.DOWNLOAD_TOTAL, parser.name
- )
+ def _dispatch_results(self, parser, request, results):
+ dispatcher = ResultDispatcher(
+ request_buffer=self._request_buffer,
+ item_buffer=self._item_buffer,
+ deal_request=self.deal_request,
+ sync_request_factory=self._make_sync_request_payload,
+ allow_callable=False,
+ )
+ return dispatcher.dispatch(parser, request, results)
- # 解析request
- if request.auto_request:
- request_temp = None
- response = None
-
- # 下载中间件
- if request.download_midware:
- if isinstance(request.download_midware, (list, tuple)):
- request_temp = request
- for download_midware in request.download_midware:
- download_midware = (
- download_midware
- if callable(download_midware)
- else tools.get_method(parser, download_midware)
- )
- request_temp = download_midware(request_temp)
- else:
- download_midware = (
- request.download_midware
- if callable(request.download_midware)
- else tools.get_method(
- parser, request.download_midware
- )
- )
- request_temp = download_midware(request)
- elif request.download_midware != False:
- request_temp = parser.download_midware(request)
-
- # 请求
- if request_temp:
- if (
- isinstance(request_temp, (tuple, list))
- and len(request_temp) == 2
- ):
- request_temp, response = request_temp
-
- if not isinstance(request_temp, Request):
- raise Exception(
- "download_midware need return a request, but received type: {}".format(
- type(request_temp)
- )
- )
- request = request_temp
-
- if response is None:
- response = (
- request.get_response()
- if not setting.RESPONSE_CACHED_USED
- else request.get_response_from_cached(save_cached=False)
- )
-
- # 校验
- if parser.validate(request, response) == False:
- break
-
- else:
- response = None
-
- if request.callback: # 如果有parser的回调函数,则用回调处理
- callback_parser = (
- request.callback
- if callable(request.callback)
- else tools.get_method(parser, request.callback)
- )
- results = callback_parser(request, response)
- else: # 否则默认用parser处理
- results = parser.parse(request, response)
-
- if results and not isinstance(results, Iterable):
- raise Exception(
- "%s.%s返回值必须可迭代" % (parser.name, request.callback or "parse")
- )
-
- # 此处判断是request 还是 item
- for result in results or []:
- if isinstance(result, Request):
- # 给request的 parser_name 赋值
- result.parser_name = result.parser_name or parser.name
-
- # 判断是同步的callback还是异步的
- if result.request_sync: # 同步
- self.deal_request(result)
- else: # 异步
- # 将next_request 入库
- self._request_buffer.put_request(result)
-
- elif isinstance(result, Item):
- self._item_buffer.put_item(result)
- elif result is not None:
- function_name = "{}.{}".format(
- parser.name,
- (
- request.callback
- and callable(request.callback)
- and getattr(request.callback, "__name__")
- or request.callback
- )
- or "parse",
- )
- raise TypeError(
- f"{function_name} result expect Request or Item, bug get type: {type(result)}"
- )
-
- except Exception as e:
- exception_type = (
- str(type(e)).replace("", "")
- )
- if exception_type.startswith("requests"):
- # 记录下载失败的文档
- self.record_download_status(
- ParserControl.DOWNLOAD_EXCEPTION, parser.name
- )
- if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0:
- request.del_proxy()
-
- else:
- # 记录解析程序异常
- self.record_download_status(
- ParserControl.PAESERS_EXCEPTION, parser.name
- )
-
- if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多
- log.exception(e)
-
- log.error(
- """
- -------------- %s.%s error -------------
- error %s
- response %s
- deal request %s
- """
- % (
- parser.name,
- (
- request.callback
- and callable(request.callback)
- and getattr(request.callback, "__name__")
- or request.callback
- )
- or "parse",
- str(e),
- response,
- tools.dumps_json(request.to_dict, indent=28)
- if setting.LOG_LEVEL == "DEBUG"
- else request,
- )
- )
-
- request.error_msg = "%s: %s" % (exception_type, e)
- request.response = str(response)
-
- if "Invalid URL" in str(e):
- request.is_abandoned = True
-
- requests = parser.exception_request(request, response, e) or [
- request
- ]
- if not isinstance(requests, Iterable):
- raise Exception(
- "%s.%s返回值必须可迭代" % (parser.name, "exception_request")
- )
- for request in requests:
- if not isinstance(request, Request):
- raise Exception("exception_request 需 yield request")
-
- if (
- request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
- or request.is_abandoned
- ):
- self.__class__._failed_task_count += 1 # 记录失败任务数
-
- # 处理failed_request的返回值 request 或 func
- results = parser.failed_request(request, response, e) or [
- request
- ]
- if not isinstance(results, Iterable):
- raise Exception(
- "%s.%s返回值必须可迭代" % (parser.name, "failed_request")
- )
-
- log.info(
- """
- 任务超过最大重试次数,丢弃
- url %s
- 重试次数 %s
- 最大允许重试次数 %s"""
- % (
- request.url,
- request.retry_times,
- setting.SPIDER_MAX_RETRY_TIMES,
- )
- )
-
- else:
- # 将 requests 重新入库 爬取
- request.retry_times += 1
- request.filter_repeat = False
- log.info(
- """
- 入库 等待重试
- url %s
- 重试次数 %s
- 最大允许重试次数 %s"""
- % (
- request.url,
- request.retry_times,
- setting.SPIDER_MAX_RETRY_TIMES,
- )
- )
- self._request_buffer.put_request(request)
+ def _finish_request(
+ self,
+ request_redis,
+ del_request_redis_after_item_to_db,
+ del_request_redis_after_request_to_db,
+ ):
+ assert request_redis is None
+ assert del_request_redis_after_item_to_db is False
+ assert del_request_redis_after_request_to_db is False
+ return
+
+ def _download_response(self, request, response):
+ if response is None:
+ response = (
+ request.get_response()
+ if not setting.RESPONSE_CACHED_USED
+ else request.get_response_from_cached(save_cached=False)
+ )
+
+ return response
+
+ def _handle_air_exception(self, parser, request, response, error):
+ exception_type = str(type(error)).replace("", "")
+ if exception_type.startswith("requests"):
+ # 记录下载失败的文档
+ self.record_download_status(ParserControl.DOWNLOAD_EXCEPTION, parser.name)
+ if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0:
+ request.del_proxy()
+
+ else:
+ # 记录解析程序异常
+ self.record_download_status(ParserControl.PAESERS_EXCEPTION, parser.name)
+
+ if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多
+ log.exception(error)
+
+ log.error(
+ """
+ -------------- %s.%s error -------------
+ error %s
+ response %s
+ deal request %s
+ """
+ % (
+ parser.name,
+ (
+ request.callback
+ and callable(request.callback)
+ and getattr(request.callback, "__name__")
+ or request.callback
+ )
+ or "parse",
+ str(error),
+ response,
+ tools.dumps_json(request.to_dict, indent=28)
+ if setting.LOG_LEVEL == "DEBUG"
+ else request,
+ )
+ )
- else:
- # 记录下载成功的文档
- self.record_download_status(
- ParserControl.DOWNLOAD_SUCCESS, parser.name
- )
- # 记录成功任务数
- self.__class__._success_task_count += 1
+ request.error_msg = "%s: %s" % (exception_type, error)
+ request.response = str(response)
- # 缓存下载成功的文档
- if setting.RESPONSE_CACHED_ENABLE:
- request.save_cached(
- response=response,
- expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME,
- )
+ if "Invalid URL" in str(error):
+ request.is_abandoned = True
- finally:
- # 释放浏览器
- if response and getattr(response, "browser", None):
- request.render_downloader.put_back(response.browser)
+ requests = parser.exception_request(request, response, error) or [request]
+ if not isinstance(requests, Iterable):
+ raise Exception("%s.%s返回值必须可迭代" % (parser.name, "exception_request"))
- break
+ for retry_request in requests:
+ if not isinstance(retry_request, Request):
+ raise Exception("exception_request 需 yield request")
- if setting.SPIDER_SLEEP_TIME:
if (
- isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list))
- and len(setting.SPIDER_SLEEP_TIME) == 2
+ retry_request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
+ or retry_request.is_abandoned
):
- sleep_time = random.randint(
- int(setting.SPIDER_SLEEP_TIME[0]), int(setting.SPIDER_SLEEP_TIME[1])
+ self.__class__._failed_task_count += 1 # 记录失败任务数
+
+ # 处理failed_request的返回值 request
+ results = parser.failed_request(retry_request, response, error) or [
+ retry_request
+ ]
+ if not isinstance(results, Iterable):
+ raise Exception(
+ "%s.%s返回值必须可迭代" % (parser.name, "failed_request")
+ )
+
+ log.info(
+ """
+ 任务超过最大重试次数,丢弃
+ url %s
+ 重试次数 %s
+ 最大允许重试次数 %s"""
+ % (
+ retry_request.url,
+ retry_request.retry_times,
+ setting.SPIDER_MAX_RETRY_TIMES,
+ )
)
- time.sleep(sleep_time)
+
else:
- time.sleep(setting.SPIDER_SLEEP_TIME)
+ # 将 requests 重新入库 爬取
+ retry_request.retry_times += 1
+ retry_request.filter_repeat = False
+ log.info(
+ """
+ 入库 等待重试
+ url %s
+ 重试次数 %s
+ 最大允许重试次数 %s"""
+ % (
+ retry_request.url,
+ retry_request.retry_times,
+ setting.SPIDER_MAX_RETRY_TIMES,
+ )
+ )
+ self._request_buffer.put_request(retry_request)
+
+ def deal_request(self, request):
+ response = None
+
+ parser = self._find_parser(request)
+ if not parser:
+ self._sleep_after_request()
+ return
+
+ try:
+ self.__class__._total_task_count += 1
+ # 记录需下载的文档
+ self.record_download_status(ParserControl.DOWNLOAD_TOTAL, parser.name)
+
+ # 解析request
+ request, response, used_download_midware_enable = self._prepare_response(
+ parser, request
+ )
+ if request.auto_request:
+ response = self._download_response(request, response)
+
+ # 校验
+ if request.auto_request and parser.validate(request, response) == False:
+ return
+
+ results = self._run_callback(parser, request, response)
+ self._dispatch_results(parser, request, results)
+
+ except Exception as e:
+ self._handle_air_exception(
+ parser=parser,
+ request=request,
+ response=response,
+ error=e,
+ )
+
+ else:
+ self._record_success(parser, request, response)
+
+ finally:
+ self._release_response_browser(request, response)
+ self._finish_request(None, False, False)
+ self._sleep_after_request()
diff --git a/feapder/core/result_dispatcher.py b/feapder/core/result_dispatcher.py
new file mode 100644
index 00000000..43186cd7
--- /dev/null
+++ b/feapder/core/result_dispatcher.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+"""
+Parser result routing for runtime parser controls.
+"""
+from collections.abc import Iterable
+from dataclasses import dataclass
+
+from feapder.network.item import Item
+from feapder.network.request import Request
+
+
+@dataclass
+class DispatchResult:
+ del_request_redis_after_item_to_db: bool = False
+ del_request_redis_after_request_to_db: bool = False
+
+
+class ResultDispatcher:
+ REQUEST_RESULT = 1
+ ITEM_RESULT = 2
+
+ def __init__(
+ self,
+ *,
+ request_buffer,
+ item_buffer,
+ deal_request,
+ sync_request_factory=None,
+ allow_callable=True,
+ ):
+ self._request_buffer = request_buffer
+ self._item_buffer = item_buffer
+ self._deal_request = deal_request
+ self._sync_request_factory = sync_request_factory or (lambda request: request)
+ self._allow_callable = allow_callable
+
+ def dispatch(self, parser, request, results):
+ if results and not isinstance(results, Iterable):
+ raise Exception(
+ "%s.%s返回值必须可迭代" % (parser.name, request.callback or "parse")
+ )
+
+ dispatch_result = DispatchResult()
+ result_type = 0
+
+ for result in results or []:
+ if isinstance(result, Request):
+ result_type = self.REQUEST_RESULT
+ result.parser_name = result.parser_name or parser.name
+ if result.request_sync:
+ self._deal_request(self._sync_request_factory(result))
+ else:
+ self._request_buffer.put_request(result)
+ dispatch_result.del_request_redis_after_request_to_db = True
+
+ elif isinstance(result, Item):
+ result_type = self.ITEM_RESULT
+ self._item_buffer.put_item(result)
+ dispatch_result.del_request_redis_after_item_to_db = True
+
+ elif callable(result) and self._allow_callable:
+ if result_type == self.ITEM_RESULT:
+ self._item_buffer.put_item(result)
+ dispatch_result.del_request_redis_after_item_to_db = True
+ else:
+ self._request_buffer.put_request(result)
+ dispatch_result.del_request_redis_after_request_to_db = True
+
+ elif result is not None:
+ raise TypeError(self._format_type_error(parser, request, result))
+
+ return dispatch_result
+
+ def _format_type_error(self, parser, request, result):
+ callback_name = (
+ request.callback
+ and callable(request.callback)
+ and getattr(request.callback, "__name__")
+ or request.callback
+ ) or "parse"
+ expected = "Request、Item or callback" if self._allow_callable else "Request or Item"
+ return (
+ f"{parser.name}.{callback_name} result expect {expected}, "
+ f"bug get type: {type(result)}"
+ )
diff --git a/feapder/core/runtime_state.py b/feapder/core/runtime_state.py
new file mode 100644
index 00000000..a8cac210
--- /dev/null
+++ b/feapder/core/runtime_state.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+"""
+Small runtime state helper for background runtime threads.
+"""
+from contextlib import contextmanager
+from threading import RLock
+
+
+class RuntimeState:
+ def __init__(self):
+ self._stop_requested = False
+ self._busy_count = 0
+ self._lock = RLock()
+
+ def request_stop(self):
+ with self._lock:
+ self._stop_requested = True
+
+ @property
+ def is_stop_requested(self):
+ with self._lock:
+ return self._stop_requested
+
+ def mark_busy(self):
+ with self._lock:
+ self._busy_count += 1
+
+ def mark_idle(self):
+ with self._lock:
+ if self._busy_count > 0:
+ self._busy_count -= 1
+
+ @property
+ def busy_count(self):
+ with self._lock:
+ return self._busy_count
+
+ @property
+ def is_idle(self):
+ return self.busy_count == 0
+
+ @contextmanager
+ def busy(self):
+ self.mark_busy()
+ try:
+ yield
+ finally:
+ self.mark_idle()
diff --git a/feapder/core/scheduler.py b/feapder/core/scheduler.py
index 0177d185..7fb1fcc0 100644
--- a/feapder/core/scheduler.py
+++ b/feapder/core/scheduler.py
@@ -286,32 +286,19 @@ def _start(self):
self.__add_task()
def all_thread_is_done(self):
- # 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性
+ # Check three times to avoid transient idle states between runtime stages.
for i in range(3):
- # 检测 collector 状态
- if (
- self._collector.is_collector_task()
- or self._collector.get_requests_count() > 0
- ):
+ if not self._collector.is_idle():
return False
- # 检测 parser_control 状态
for parser_control in self._parser_controls:
- if not parser_control.is_not_task():
+ if not parser_control.is_idle():
return False
- # 检测 item_buffer 状态
- if (
- self._item_buffer.get_items_count() > 0
- or self._item_buffer.is_adding_to_db()
- ):
+ if not self._item_buffer.is_idle():
return False
- # 检测 request_buffer 状态
- if (
- self._request_buffer.get_requests_count() > 0
- or self._request_buffer.is_adding_to_db()
- ):
+ if not self._request_buffer.is_idle():
return False
tools.delay_time(1)
diff --git a/feapder/core/spiders/air_spider.py b/feapder/core/spiders/air_spider.py
index 70c30112..f51f8930 100644
--- a/feapder/core/spiders/air_spider.py
+++ b/feapder/core/spiders/air_spider.py
@@ -57,21 +57,16 @@ def distribute_task(self):
self._request_buffer.put_request(request, ignore_max_size=False)
def all_thread_is_done(self):
- for i in range(3): # 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性
- # 检测 parser_control 状态
+ # Check three times to avoid transient idle states between runtime stages.
+ for i in range(3):
for parser_control in self._parser_controls:
- if not parser_control.is_not_task():
+ if not parser_control.is_idle():
return False
- # 检测 任务队列 状态
if not self._memory_db.empty():
return False
- # 检测 item_buffer 状态
- if (
- self._item_buffer.get_items_count() > 0
- or self._item_buffer.is_adding_to_db()
- ):
+ if not self._item_buffer.is_idle():
return False
tools.delay_time(1)
diff --git a/feapder/db/redisdb.py b/feapder/db/redisdb.py
index d882e687..ee4e0cd9 100644
--- a/feapder/db/redisdb.py
+++ b/feapder/db/redisdb.py
@@ -6,11 +6,11 @@
---------
@author: Boris
"""
-import os
import time
from typing import Union, List
import redis
+from redis.cluster import ClusterNode, RedisCluster
from redis.connection import Encoder as _Encoder
from redis.exceptions import ConnectionError, TimeoutError
from redis.exceptions import DataError
@@ -140,29 +140,24 @@ def get_connect(self):
startup_nodes = []
for ip_port in ip_ports:
ip, port = ip_port.split(":")
- startup_nodes.append({"host": ip, "port": port})
+ startup_nodes.append(ClusterNode(ip, int(port)))
if self._service_name:
# log.debug("使用redis哨兵模式")
- hosts = [(node["host"], node["port"]) for node in startup_nodes]
+ hosts = [(node.host, node.port) for node in startup_nodes]
sentinel = Sentinel(hosts, socket_timeout=3, **self._kwargs)
self._redis = sentinel.master_for(
self._service_name,
password=self._user_pass,
db=self._db,
- redis_class=redis.StrictRedis,
+ redis_class=redis.Redis,
decode_responses=self._decode_responses,
max_connections=self._max_connections,
**self._kwargs,
)
+ self._is_redis_cluster = False
else:
- try:
- from rediscluster import RedisCluster
- except ModuleNotFoundError as e:
- log.error('请安装 pip install "feapder[all]"')
- os._exit(0)
-
# log.debug("使用redis集群模式")
self._redis = RedisCluster(
startup_nodes=startup_nodes,
@@ -171,11 +166,10 @@ def get_connect(self):
max_connections=self._max_connections,
**self._kwargs,
)
-
- self._is_redis_cluster = True
+ self._is_redis_cluster = True
else:
ip, port = ip_ports[0].split(":")
- self._redis = redis.StrictRedis(
+ self._redis = redis.Redis(
host=ip,
port=port,
db=self._db,
@@ -186,7 +180,7 @@ def get_connect(self):
)
self._is_redis_cluster = False
else:
- self._redis = redis.StrictRedis.from_url(
+ self._redis = redis.Redis.from_url(
self._url, decode_responses=self._decode_responses, **self._kwargs
)
self._is_redis_cluster = False
@@ -573,7 +567,8 @@ def zexists(self, table, values):
if isinstance(values, list):
pipe = self._redis.pipeline()
- pipe.multi()
+ if not self._is_redis_cluster:
+ pipe.multi()
for value in values:
pipe.zscore(table, value)
is_exists_temp = pipe.execute()
@@ -773,7 +768,8 @@ def setbit(
else:
assert len(offsets) == len(values), "offsets值要与values值一一对应"
pipe = self._redis.pipeline()
- pipe.multi()
+ if not self._is_redis_cluster:
+ pipe.multi()
for offset, value in zip(offsets, values):
pipe.setbit(table, offset, value)
@@ -792,7 +788,8 @@ def getbit(self, table, offsets):
"""
if isinstance(offsets, list):
pipe = self._redis.pipeline()
- pipe.multi()
+ if not self._is_redis_cluster:
+ pipe.multi()
for offset in offsets:
pipe.getbit(table, offset)
diff --git a/feapder/network/proxy_pool_old.py b/feapder/network/proxy_pool_old.py
index 2e3bb6c1..fddaabc1 100644
--- a/feapder/network/proxy_pool_old.py
+++ b/feapder/network/proxy_pool_old.py
@@ -147,7 +147,7 @@ def get_proxy_from_redis(proxy_source_url, **kwargs):
@return: [{'http':'http://xxx.xxx.xxx:xxx', 'https':'http://xxx.xxx.xxx.xxx:xxx'}]
"""
- redis_conn = redis.StrictRedis.from_url(proxy_source_url)
+ redis_conn = redis.Redis.from_url(proxy_source_url)
key = kwargs.get("redis_proxies_key")
assert key, "从redis中获取代理 需要指定 redis_proxies_key"
proxies = redis_conn.zrange(key, 0, -1)
diff --git a/feapder/requirements.txt b/feapder/requirements.txt
index 21717674..888d27f2 100644
--- a/feapder/requirements.txt
+++ b/feapder/requirements.txt
@@ -1,21 +1,20 @@
better-exceptions>=0.2.2
-DBUtils>=2.0
-parsel>=1.5.2
+DBUtils>=3.0
+parsel>=1.8.1
PyExecJS>=1.5.1
-pymongo>=3.10.1
-PyMySQL>=0.9.3
-redis>=2.10.6,<4.0.0
-requests>=2.22.0
-selenium>=3.141.0
-bs4>=0.0.1
-ipython>=7.14.0
-bitarray>=1.5.3
-redis-py-cluster>=2.1.0
-cryptography>=3.3.2
-urllib3>=1.25.8
+pymongo>=4.0.0
+PyMySQL>=1.1.0
+redis>=5.0.0,<9.0.0
+requests>=2.31.0
+selenium>=4.10.0
+beautifulsoup4>=4.12.0
+ipython>=8.0.0
+bitarray>=2.8.0
+cryptography>=41.0.0
+urllib3>=2.0.0,<3.0.0
loguru>=0.5.3
influxdb>=5.3.1
pyperclip>=1.8.2
webdriver-manager>=4.0.0
terminal-layout>=2.1.3
-playwright
\ No newline at end of file
+playwright>=1.40.0
diff --git a/feapder/utils/webdriver/selenium_driver.py b/feapder/utils/webdriver/selenium_driver.py
index 9f46d54b..06eb5493 100644
--- a/feapder/utils/webdriver/selenium_driver.py
+++ b/feapder/utils/webdriver/selenium_driver.py
@@ -8,13 +8,13 @@
@email: boris_liu@foxmail.com
"""
+import inspect
import json
import logging
import os
from typing import Optional, Union, List
from selenium import webdriver
-from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
@@ -33,41 +33,7 @@ class SeleniumDriver(WebDriver, RemoteWebDriver):
PHANTOMJS = "PHANTOMJS"
FIREFOX = "FIREFOX"
- __CHROME_ATTRS__ = {
- "executable_path",
- "port",
- "options",
- "service_args",
- "desired_capabilities",
- "service_log_path",
- "chrome_options",
- "keep_alive",
- }
-
- __EDGE_ATTRS__ = __CHROME_ATTRS__
-
- __FIREFOX_ATTRS__ = {
- "firefox_profile",
- "firefox_binary",
- "timeout",
- "capabilities",
- "proxy",
- "executable_path",
- "options",
- "service_log_path",
- "firefox_options",
- "service_args",
- "desired_capabilities",
- "log_path",
- "keep_alive",
- }
- __PHANTOMJS_ATTRS__ = {
- "executable_path",
- "port",
- "desired_capabilities",
- "service_args",
- "service_log_path",
- }
+ __DRIVER_ATTRS__ = {"keep_alive"}
def __init__(self, xhr_url_regexes: list = None, **kwargs):
"""
@@ -131,41 +97,117 @@ def filter_kwargs(self, kwargs: dict, driver_attrs: set):
return data
+ def get_options(self, default_options, *option_keys):
+ for option_key in option_keys:
+ options = self._kwargs.get(option_key)
+ if options is not None:
+ return options
+
+ return default_options
+
+ def get_driver_kwargs(self):
+ return self.filter_kwargs(self._kwargs, self.__DRIVER_ATTRS__)
+
+ def apply_capabilities(self, options, *capability_keys):
+ for capability_key in capability_keys:
+ capabilities = self._kwargs.get(capability_key)
+ if not capabilities:
+ continue
+
+ for key, value in capabilities.items():
+ options.set_capability(key, value)
+
+ return options
+
+ def build_service(self, service_cls, driver_manager_cls=None):
+ service = self._kwargs.get("service")
+ if service is not None:
+ return service
+
+ service_kwargs = {}
+ service_args = self._kwargs.get("service_args")
+ if service_args is not None:
+ service_kwargs["service_args"] = service_args
+
+ port = self._kwargs.get("port")
+ if port is not None:
+ service_kwargs["port"] = port
+
+ log_path = self._kwargs.get("service_log_path")
+ if log_path is None:
+ log_path = self._kwargs.get("log_path")
+ if log_path is not None:
+ log_param = (
+ "log_output"
+ if "log_output" in inspect.signature(service_cls).parameters
+ else "log_path"
+ )
+ service_kwargs[log_param] = log_path
+
+ if self._executable_path:
+ return service_cls(self._executable_path, **service_kwargs)
+
+ if self._auto_install_driver and driver_manager_cls is not None:
+ return service_cls(driver_manager_cls().install(), **service_kwargs)
+
+ if service_kwargs:
+ return service_cls(**service_kwargs)
+
+ return None
+
+ def create_driver(self, driver_cls, options, service):
+ kwargs = self.get_driver_kwargs()
+ if service is not None:
+ kwargs["service"] = service
+
+ return driver_cls(options=options, **kwargs)
+
+ def get_proxy(self):
+ return self._proxy() if callable(self._proxy) else self._proxy
+
+ def get_user_agent(self):
+ return self._user_agent() if callable(self._user_agent) else self._user_agent
+
def get_driver(self):
return self.driver
def firefox_driver(self):
- if webdriver.__version__ >= "4.0.0":
- raise Exception(
- f"暂未适配selenium=={webdriver.__version__}版本的firefox API,建议安装selenium==3.141.0版本或使用CHROME浏览器"
- )
+ from selenium.webdriver.firefox.service import Service
- firefox_profile = webdriver.FirefoxProfile()
- firefox_options = webdriver.FirefoxOptions()
- firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
- try:
- from selenium.webdriver.firefox.service import Service
- except (ImportError, ModuleNotFoundError):
- Service = None
+ firefox_options = self.get_options(
+ webdriver.FirefoxOptions(), "options", "firefox_options"
+ )
+ firefox_profile = self._kwargs.get("firefox_profile")
+ if firefox_profile is not None:
+ firefox_options.profile = firefox_profile
+ firefox_binary = self._kwargs.get("firefox_binary")
+ if firefox_binary is not None:
+ firefox_options.binary_location = (
+ getattr(firefox_binary, "path", None)
+ or getattr(firefox_binary, "_start_cmd", None)
+ or firefox_binary
+ )
+ self.apply_capabilities(firefox_options, "desired_capabilities", "capabilities")
if self._proxy:
- proxy = self._proxy() if callable(self._proxy) else self._proxy
- firefox_capabilities["marionette"] = True
- firefox_capabilities["proxy"] = {
- "proxyType": "MANUAL",
- "httpProxy": proxy,
- "ftpProxy": proxy,
- "sslProxy": proxy,
- }
+ proxy = self.get_proxy()
+ firefox_options.set_capability(
+ "proxy",
+ {
+ "proxyType": "MANUAL",
+ "httpProxy": proxy,
+ "ftpProxy": proxy,
+ "sslProxy": proxy,
+ },
+ )
if self._user_agent:
- firefox_profile.set_preference(
- "general.useragent.override",
- self._user_agent() if callable(self._user_agent) else self._user_agent,
+ firefox_options.set_preference(
+ "general.useragent.override", self.get_user_agent()
)
if not self._load_images:
- firefox_profile.set_preference("permissions.default.image", 2)
+ firefox_options.set_preference("permissions.default.image", 2)
if self._headless:
firefox_options.add_argument("--headless")
@@ -176,25 +218,8 @@ def firefox_driver(self):
for arg in self._custom_argument:
firefox_options.add_argument(arg)
- kwargs = self.filter_kwargs(self._kwargs, self.__FIREFOX_ATTRS__)
-
- if Service is None:
- if self._executable_path:
- kwargs.update(executable_path=self._executable_path)
- elif self._auto_install_driver:
- kwargs.update(executable_path=GeckoDriverManager().install())
- else:
- if self._executable_path:
- kwargs.update(service=Service(self._executable_path))
- elif self._auto_install_driver:
- kwargs.update(service=Service(GeckoDriverManager().install()))
-
- driver = webdriver.Firefox(
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- **kwargs,
- )
+ service = self.build_service(Service, GeckoDriverManager)
+ driver = self.create_driver(webdriver.Firefox, firefox_options, service)
if self._window_size:
driver.set_window_size(*self._window_size)
@@ -202,31 +227,22 @@ def firefox_driver(self):
return driver
def chrome_driver(self):
- chrome_options = webdriver.ChromeOptions()
+ chrome_options = self.get_options(
+ webdriver.ChromeOptions(), "options", "chrome_options"
+ )
# 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option("useAutomationExtension", False)
# docker 里运行需要
chrome_options.add_argument("--no-sandbox")
- try:
- from selenium.webdriver.chrome.service import Service
- except (ImportError, ModuleNotFoundError):
- Service = None
+ from selenium.webdriver.chrome.service import Service
+
+ self.apply_capabilities(chrome_options, "desired_capabilities")
if self._proxy:
- chrome_options.add_argument(
- "--proxy-server={}".format(
- self._proxy() if callable(self._proxy) else self._proxy
- )
- )
+ chrome_options.add_argument("--proxy-server={}".format(self.get_proxy()))
if self._user_agent:
- chrome_options.add_argument(
- "user-agent={}".format(
- self._user_agent()
- if callable(self._user_agent)
- else self._user_agent
- )
- )
+ chrome_options.add_argument("user-agent={}".format(self.get_user_agent()))
if not self._load_images:
chrome_options.add_experimental_option(
"prefs", {"profile.managed_default_content_settings.images": 2}
@@ -254,19 +270,8 @@ def chrome_driver(self):
for arg in self._custom_argument:
chrome_options.add_argument(arg)
- kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__)
- if Service is None:
- if self._executable_path:
- kwargs.update(executable_path=self._executable_path)
- elif self._auto_install_driver:
- kwargs.update(executable_path=ChromeDriverManager().install())
- else:
- if self._executable_path:
- kwargs.update(service=Service(self._executable_path))
- elif self._auto_install_driver:
- kwargs.update(service=Service(ChromeDriverManager().install()))
-
- driver = webdriver.Chrome(options=chrome_options, **kwargs)
+ service = self.build_service(Service, ChromeDriverManager)
+ driver = self.create_driver(webdriver.Chrome, chrome_options, service)
# 隐藏浏览器特征
if self._use_stealth_js:
@@ -293,44 +298,30 @@ def chrome_driver(self):
)
if self._download_path:
- driver.command_executor._commands["send_command"] = (
- "POST",
- "/session/$sessionId/chromium/send_command",
+ driver.execute_cdp_cmd(
+ "Page.setDownloadBehavior",
+ {"behavior": "allow", "downloadPath": self._download_path},
)
- params = {
- "cmd": "Page.setDownloadBehavior",
- "params": {"behavior": "allow", "downloadPath": self._download_path},
- }
- driver.execute("send_command", params)
return driver
def edge_driver(self):
- edge_options = webdriver.EdgeOptions()
+ edge_options = self.get_options(
+ webdriver.EdgeOptions(), "options", "edge_options"
+ )
# 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
edge_options.add_experimental_option("excludeSwitches", ["enable-automation"])
edge_options.add_experimental_option("useAutomationExtension", False)
# docker 里运行需要
edge_options.add_argument("--no-sandbox")
- try:
- from selenium.webdriver.edge.service import Service
- except (ImportError, ModuleNotFoundError):
- Service = None
+ from selenium.webdriver.edge.service import Service
+
+ self.apply_capabilities(edge_options, "desired_capabilities")
if self._proxy:
- edge_options.add_argument(
- "--proxy-server={}".format(
- self._proxy() if callable(self._proxy) else self._proxy
- )
- )
+ edge_options.add_argument("--proxy-server={}".format(self.get_proxy()))
if self._user_agent:
- edge_options.add_argument(
- "user-agent={}".format(
- self._user_agent()
- if callable(self._user_agent)
- else self._user_agent
- )
- )
+ edge_options.add_argument("user-agent={}".format(self.get_user_agent()))
if not self._load_images:
edge_options.add_experimental_option(
"prefs", {"profile.managed_default_content_settings.images": 2}
@@ -358,19 +349,8 @@ def edge_driver(self):
for arg in self._custom_argument:
edge_options.add_argument(arg)
- kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__)
- if Service is None:
- if self._executable_path:
- kwargs.update(executable_path=self._executable_path)
- elif self._auto_install_driver:
- raise NotImplementedError("edge not support auto install driver")
- else:
- if self._executable_path:
- kwargs.update(service=Service(self._executable_path))
- elif self._auto_install_driver:
- raise NotImplementedError("edge not support auto install driver")
-
- driver = webdriver.Edge(options=edge_options, **kwargs)
+ service = self.build_service(Service)
+ driver = self.create_driver(webdriver.Edge, edge_options, service)
# 隐藏浏览器特征
if self._use_stealth_js:
@@ -397,58 +377,19 @@ def edge_driver(self):
)
if self._download_path:
- driver.command_executor._commands["send_command"] = (
- "POST",
- "/session/$sessionId/chromium/send_command",
+ driver.execute_cdp_cmd(
+ "Page.setDownloadBehavior",
+ {"behavior": "allow", "downloadPath": self._download_path},
)
- params = {
- "cmd": "Page.setDownloadBehavior",
- "params": {"behavior": "allow", "downloadPath": self._download_path},
- }
- driver.execute("send_command", params)
return driver
def phantomjs_driver(self):
- import warnings
-
- warnings.filterwarnings("ignore")
-
- service_args = []
- dcap = DesiredCapabilities.PHANTOMJS
-
- if self._proxy:
- service_args.append(
- "--proxy=%s" % self._proxy() if callable(self._proxy) else self._proxy
- )
- if self._user_agent:
- dcap["phantomjs.page.settings.userAgent"] = (
- self._user_agent() if callable(self._user_agent) else self._user_agent
- )
- if not self._load_images:
- service_args.append("--load-images=no")
-
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- service_args.append(arg)
-
- kwargs = self.filter_kwargs(self._kwargs, self.__PHANTOMJS_ATTRS__)
-
- if self._executable_path:
- kwargs.update(executable_path=self._executable_path)
-
- driver = webdriver.PhantomJS(
- service_args=service_args, desired_capabilities=dcap, **kwargs
+ raise NotImplementedError(
+ "PhantomJS is not supported by Selenium 4. "
+ "Please use CHROME, EDGE, or FIREFOX."
)
- if self._window_size:
- driver.set_window_size(self._window_size[0], self._window_size[1])
-
- del warnings
-
- return driver
-
@property
def domain(self):
return tools.get_domain(self.url or self.driver.current_url)
diff --git a/setup.py b/setup.py
index cf4fe542..14542b97 100644
--- a/setup.py
+++ b/setup.py
@@ -13,8 +13,8 @@
import setuptools
-if version_info < (3, 6, 0):
- raise SystemExit("Sorry! feapder requires python 3.6.0 or later.")
+if version_info < (3, 9, 0):
+ raise SystemExit("Sorry! feapder requires python 3.9.0 or later.")
with open(join(dirname(__file__), "feapder/VERSION"), "rb") as fh:
version = fh.read().decode("ascii").strip()
@@ -35,15 +35,15 @@
requires = [
"better-exceptions>=0.2.2",
- "DBUtils>=2.0",
- "parsel>=1.5.2",
- "PyMySQL>=0.9.3",
- "redis>=2.10.6,<4.0.0",
- "requests>=2.22.0",
- "bs4>=0.0.1",
- "ipython>=7.14.0",
- "cryptography>=3.3.2",
- "urllib3>=1.25.8",
+ "DBUtils>=3.0",
+ "parsel>=1.8.1",
+ "PyMySQL>=1.1.0",
+ "redis>=5.0.0,<9.0.0",
+ "requests>=2.31.0",
+ "beautifulsoup4>=4.12.0",
+ "ipython>=8.0.0",
+ "cryptography>=41.0.0",
+ "urllib3>=2.0.0,<3.0.0",
"loguru>=0.5.3",
"influxdb>=5.3.1",
"pyperclip>=1.8.2",
@@ -52,15 +52,14 @@
render_requires = [
"webdriver-manager>=4.0.0",
- "playwright",
- "selenium>=3.141.0",
+ "playwright>=1.40.0",
+ "selenium>=4.10.0",
]
all_requires = [
- "bitarray>=1.5.3",
+ "bitarray>=2.8.0",
"PyExecJS>=1.5.1",
- "pymongo>=3.10.1",
- "redis-py-cluster>=2.1.0",
+ "pymongo>=4.0.0",
] + render_requires
setuptools.setup(
@@ -69,7 +68,7 @@
author="Boris",
license="MIT",
author_email="feapder@qq.com",
- python_requires=">=3.6",
+ python_requires=">=3.9",
description="feapder是一款支持分布式、批次采集、数据防丢、报警丰富的python爬虫框架",
long_description=long_description,
long_description_content_type="text/markdown",
diff --git a/tests/test_dependency_modernization.py b/tests/test_dependency_modernization.py
new file mode 100644
index 00000000..e9a73b2a
--- /dev/null
+++ b/tests/test_dependency_modernization.py
@@ -0,0 +1,115 @@
+import inspect
+
+from feapder.db.redisdb import RedisDB
+from feapder.utils.webdriver.selenium_driver import SeleniumDriver
+
+
+class OldStyleService:
+ def __init__(
+ self, executable_path=None, port=0, service_args=None, log_path=None, **kwargs
+ ):
+ self.executable_path = executable_path
+ self.port = port
+ self.service_args = service_args
+ self.log_path = log_path
+
+
+class NewStyleService:
+ def __init__(
+ self, executable_path=None, port=0, service_args=None, log_output=None, **kwargs
+ ):
+ self.executable_path = executable_path
+ self.port = port
+ self.service_args = service_args
+ self.log_output = log_output
+
+
+class FakeBrowser:
+ def set_window_size(self, *args):
+ self.window_size = args
+
+
+def make_selenium_driver(**kwargs):
+ driver = object.__new__(SeleniumDriver)
+ driver._kwargs = kwargs
+ driver._executable_path = kwargs.pop("executable_path", "/tmp/driver")
+ driver._auto_install_driver = False
+ driver._proxy = None
+ driver._user_agent = None
+ driver._load_images = True
+ driver._headless = False
+ driver._custom_argument = None
+ driver._window_size = None
+ return driver
+
+
+def test_selenium_service_log_path_supports_old_and_new_service_api():
+ driver = make_selenium_driver(
+ service_log_path="/tmp/webdriver.log", service_args=["--verbose"], port=1234
+ )
+
+ old_service = driver.build_service(OldStyleService)
+ new_service = driver.build_service(NewStyleService)
+
+ assert old_service.executable_path == "/tmp/driver"
+ assert old_service.service_args == ["--verbose"]
+ assert old_service.port == 1234
+ assert old_service.log_path == "/tmp/webdriver.log"
+ assert new_service.executable_path == "/tmp/driver"
+ assert new_service.service_args == ["--verbose"]
+ assert new_service.port == 1234
+ assert new_service.log_output == "/tmp/webdriver.log"
+
+
+def test_selenium_firefox_binary_maps_to_options_binary_location():
+ driver = make_selenium_driver(firefox_binary="/tmp/firefox")
+ captured = {}
+
+ def create_driver(driver_cls, options, service):
+ captured["options"] = options
+ captured["service"] = service
+ return FakeBrowser()
+
+ driver.create_driver = create_driver
+ driver.build_service = lambda *args, **kwargs: None
+
+ assert driver.firefox_driver() is not None
+ assert captured["options"].binary_location == "/tmp/firefox"
+ assert captured["service"] is None
+
+
+def test_selenium_driver_kwargs_keep_public_constructor_args_internal():
+ driver = make_selenium_driver(
+ keep_alive=False,
+ executable_path="/tmp/driver",
+ desired_capabilities={"acceptInsecureCerts": True},
+ service_args=["--verbose"],
+ )
+
+ assert driver.get_driver_kwargs() == {"keep_alive": False}
+
+
+def test_redisdb_public_api_signatures_are_preserved():
+ expected = {
+ "__init__": [
+ "self",
+ "ip_ports",
+ "db",
+ "user_pass",
+ "url",
+ "decode_responses",
+ "service_name",
+ "max_connections",
+ "kwargs",
+ ],
+ "from_url": ["url"],
+ "zadd": ["self", "table", "values", "prioritys"],
+ "zget": ["self", "table", "count", "is_pop"],
+ "zexists": ["self", "table", "values"],
+ "setbit": ["self", "table", "offsets", "values"],
+ "getbit": ["self", "table", "offsets"],
+ }
+
+ for method_name, parameters in expected.items():
+ signature = inspect.signature(getattr(RedisDB, method_name))
+ assert list(signature.parameters) == parameters
diff --git a/tests/test_parser_control_runtime.py b/tests/test_parser_control_runtime.py
new file mode 100644
index 00000000..bf9b76f4
--- /dev/null
+++ b/tests/test_parser_control_runtime.py
@@ -0,0 +1,306 @@
+import pytest
+
+import feapder.setting as setting
+from feapder.core.parser_control import AirSpiderParserControl, ParserControl
+from feapder.network.request import Request
+from feapder.network.item import Item
+
+
+class FakeResponse:
+ browser = None
+
+ def __str__(self):
+ return ""
+
+
+class FakeParser:
+ name = "FakeParser"
+
+ def __init__(self):
+ self.validated = []
+ self.parsed = []
+
+ def download_midware(self, request):
+ return None
+
+ def validate(self, request, response):
+ self.validated.append((request, response))
+ return True
+
+ def parse(self, request, response):
+ self.parsed.append((request, response))
+ return [Item(title="ok")]
+
+ def exception_request(self, request, response, e):
+ return [request]
+
+ def failed_request(self, request, response, e):
+ return [request]
+
+
+class FakeParserRecordingExceptions(FakeParser):
+ def __init__(self):
+ super().__init__()
+ self.exception_requests = []
+
+ def exception_request(self, request, response, e):
+ self.exception_requests.append((request, response, e))
+ return [request]
+
+
+class FakeRequestBuffer:
+ def __init__(self):
+ self.requests = []
+ self.deleted = []
+ self.failed = []
+
+ def put_request(self, request):
+ self.requests.append(request)
+
+ def put_del_request(self, request):
+ self.deleted.append(request)
+
+ def put_failed_request(self, request):
+ self.failed.append(request)
+
+
+class FakeItemBuffer:
+ def __init__(self):
+ self.items = []
+
+ def put_item(self, item):
+ self.items.append(item)
+
+
+def return_none_response():
+ return None
+
+
+def build_control():
+ control = object.__new__(ParserControl)
+ control._parsers = []
+ control._request_buffer = FakeRequestBuffer()
+ control._item_buffer = FakeItemBuffer()
+ return control
+
+
+def build_air_control():
+ control = object.__new__(AirSpiderParserControl)
+ control._parsers = []
+ control._request_buffer = FakeRequestBuffer()
+ control._item_buffer = FakeItemBuffer()
+ return control
+
+
+def test_find_parser_by_request_parser_name():
+ parser = FakeParser()
+ control = build_control()
+ control._parsers = [parser]
+
+ assert control._find_parser(Request(parser_name="FakeParser")) is parser
+
+
+def test_find_parser_returns_none_for_missing_parser():
+ control = build_control()
+ control._parsers = [FakeParser()]
+
+ assert control._find_parser(Request(parser_name="OtherParser")) is None
+
+
+def test_run_callback_uses_named_callback():
+ parser = FakeParser()
+ parser.custom = lambda request, response: [Item(title="custom")]
+ control = build_control()
+ request = Request(callback="custom")
+ response = FakeResponse()
+
+ results = control._run_callback(parser, request, response)
+
+ assert len(results) == 1
+ assert results[0]["title"] == "custom"
+
+
+def test_finish_request_prefers_item_buffer_for_item_results():
+ control = build_control()
+
+ control._finish_request(
+ request_redis="raw-request",
+ del_request_redis_after_item_to_db=True,
+ del_request_redis_after_request_to_db=True,
+ )
+
+ assert control._item_buffer.items == ["raw-request"]
+ assert control._request_buffer.deleted == []
+
+
+def test_finish_request_deletes_via_request_buffer_for_request_results():
+ control = build_control()
+
+ control._finish_request(
+ request_redis="raw-request",
+ del_request_redis_after_item_to_db=False,
+ del_request_redis_after_request_to_db=True,
+ )
+
+ assert control._item_buffer.items == []
+ assert control._request_buffer.deleted == ["raw-request"]
+
+
+def test_finish_request_deletes_unclaimed_request_by_default():
+ control = build_control()
+
+ control._finish_request(
+ request_redis="raw-request",
+ del_request_redis_after_item_to_db=False,
+ del_request_redis_after_request_to_db=False,
+ )
+
+ assert control._request_buffer.deleted == ["raw-request"]
+
+
+def test_deal_request_uses_request_returned_by_download_middleware(monkeypatch):
+ parser = FakeParser()
+ control = build_control()
+ control._parsers = [parser]
+ response = FakeResponse()
+ replacement_request = Request(
+ "https://replacement.example.com",
+ parser_name="FakeParser",
+ auto_request=True,
+ )
+
+ def replace_request(request):
+ return replacement_request, response
+
+ request = Request(
+ "https://example.com",
+ parser_name="FakeParser",
+ download_midware=replace_request,
+ )
+
+ monkeypatch.setattr(control, "record_download_status", lambda status, spider: None)
+ monkeypatch.setattr(control, "_sleep_after_request", lambda: None)
+
+ control.deal_request({"request_obj": request, "request_redis": "raw-request"})
+
+ assert parser.validated == [(replacement_request, response)]
+ assert parser.parsed == [(replacement_request, response)]
+
+
+def test_deal_request_handles_fetch_exception_on_replacement_request(monkeypatch):
+ parser = FakeParserRecordingExceptions()
+ control = build_control()
+ control._parsers = [parser]
+ replacement_request = Request(
+ "https://replacement.example.com",
+ parser_name="FakeParser",
+ auto_request=True,
+ )
+
+ def raise_fetch_error():
+ raise RuntimeError("replacement fetch failed")
+
+ replacement_request.get_response = raise_fetch_error
+
+ def replace_request(request):
+ return replacement_request
+
+ request = Request(
+ "https://example.com",
+ parser_name="FakeParser",
+ download_midware=replace_request,
+ )
+
+ monkeypatch.setattr(control, "record_download_status", lambda status, spider: None)
+ monkeypatch.setattr(control, "_sleep_after_request", lambda: None)
+ monkeypatch.setattr(setting, "LOG_LEVEL", "INFO")
+
+ control.deal_request({"request_obj": request, "request_redis": None})
+
+ assert parser.exception_requests
+ assert parser.exception_requests[0][0] is replacement_request
+
+
+def test_deal_request_dispatches_item_and_marks_request_for_item_delete(monkeypatch):
+ parser = FakeParser()
+ control = build_control()
+ control._parsers = [parser]
+ response = FakeResponse()
+ request = Request(
+ "https://example.com",
+ parser_name="FakeParser",
+ auto_request=False,
+ )
+
+ monkeypatch.setattr(control, "record_download_status", lambda status, spider: None)
+ monkeypatch.setattr(control, "_sleep_after_request", lambda: None)
+
+ control.deal_request({"request_obj": request, "request_redis": "raw-request"})
+
+ assert len(control._item_buffer.items) == 2
+ assert isinstance(control._item_buffer.items[0], Item)
+ assert control._item_buffer.items[1] == "raw-request"
+ assert control._request_buffer.deleted == []
+
+
+def test_air_parser_control_uses_direct_sync_request_payload():
+ control = build_air_control()
+ request = Request("https://example.com")
+
+ assert control._make_sync_request_payload(request) is request
+
+
+def test_air_parser_control_dispatches_item_without_redis_finish(monkeypatch):
+ parser = FakeParser()
+ control = build_air_control()
+ control._parsers = [parser]
+ request = Request(
+ "https://example.com",
+ parser_name="FakeParser",
+ auto_request=False,
+ )
+
+ monkeypatch.setattr(control, "record_download_status", lambda status, spider: None)
+ monkeypatch.setattr(control, "_sleep_after_request", lambda: None)
+
+ control.deal_request(request)
+
+ assert len(control._item_buffer.items) == 1
+ assert isinstance(control._item_buffer.items[0], Item)
+ assert control._request_buffer.deleted == []
+
+
+def test_air_parser_control_allows_none_download_response(monkeypatch):
+ parser = FakeParserRecordingExceptions()
+ control = build_air_control()
+ control._parsers = [parser]
+ request = Request(
+ "https://example.com",
+ parser_name="FakeParser",
+ auto_request=True,
+ )
+ request.get_response = return_none_response
+
+ monkeypatch.setattr(control, "record_download_status", lambda status, spider: None)
+ monkeypatch.setattr(control, "_sleep_after_request", lambda: None)
+
+ control.deal_request(request)
+
+ assert parser.exception_requests == []
+ assert parser.validated == [(request, None)]
+ assert parser.parsed == [(request, None)]
+
+
+def test_air_parser_control_finish_request_rejects_redis_deletion_inputs():
+ control = build_air_control()
+
+ control._finish_request(None, False, False)
+
+ with pytest.raises(AssertionError):
+ control._finish_request("raw-request", False, False)
+
+ with pytest.raises(AssertionError):
+ control._finish_request(None, True, False)
+
+ with pytest.raises(AssertionError):
+ control._finish_request(None, False, True)
diff --git a/tests/test_result_dispatcher.py b/tests/test_result_dispatcher.py
new file mode 100644
index 00000000..cd4e29df
--- /dev/null
+++ b/tests/test_result_dispatcher.py
@@ -0,0 +1,169 @@
+import pytest
+
+from feapder.core.result_dispatcher import ResultDispatcher
+from feapder.network.item import Item, UpdateItem
+from feapder.network.request import Request
+
+
+class FakeParser:
+ name = "FakeParser"
+
+
+class FakeRequestBuffer:
+ def __init__(self):
+ self.requests = []
+
+ def put_request(self, request):
+ self.requests.append(request)
+
+
+class FakeItemBuffer:
+ def __init__(self):
+ self.items = []
+
+ def put_item(self, item):
+ self.items.append(item)
+
+
+def test_dispatcher_routes_async_request_and_sets_parser_name():
+ request_buffer = FakeRequestBuffer()
+ item_buffer = FakeItemBuffer()
+ calls = []
+ dispatcher = ResultDispatcher(
+ request_buffer=request_buffer,
+ item_buffer=item_buffer,
+ deal_request=calls.append,
+ sync_request_factory=lambda request: {"request_obj": request, "request_redis": None},
+ )
+ next_request = Request("https://example.com")
+
+ result = dispatcher.dispatch(
+ parser=FakeParser(),
+ request=Request("https://root.example.com", callback="parse"),
+ results=[next_request],
+ )
+
+ assert next_request.parser_name == "FakeParser"
+ assert request_buffer.requests == [next_request]
+ assert item_buffer.items == []
+ assert calls == []
+ assert result.del_request_redis_after_request_to_db is True
+ assert result.del_request_redis_after_item_to_db is False
+
+
+def test_dispatcher_routes_sync_request_to_deal_request():
+ request_buffer = FakeRequestBuffer()
+ item_buffer = FakeItemBuffer()
+ calls = []
+ dispatcher = ResultDispatcher(
+ request_buffer=request_buffer,
+ item_buffer=item_buffer,
+ deal_request=calls.append,
+ sync_request_factory=lambda request: {"request_obj": request, "request_redis": None},
+ )
+ next_request = Request("https://example.com", request_sync=True)
+
+ result = dispatcher.dispatch(
+ parser=FakeParser(),
+ request=Request("https://root.example.com"),
+ results=[next_request],
+ )
+
+ assert calls == [{"request_obj": next_request, "request_redis": None}]
+ assert request_buffer.requests == []
+ assert result.del_request_redis_after_request_to_db is False
+
+
+def test_dispatcher_routes_item_and_update_item_to_item_buffer():
+ request_buffer = FakeRequestBuffer()
+ item_buffer = FakeItemBuffer()
+ dispatcher = ResultDispatcher(
+ request_buffer=request_buffer,
+ item_buffer=item_buffer,
+ deal_request=lambda request: None,
+ )
+ item = Item(title="one")
+ update_item = UpdateItem(id=1, title="two")
+
+ result = dispatcher.dispatch(
+ parser=FakeParser(),
+ request=Request("https://root.example.com"),
+ results=[item, update_item],
+ )
+
+ assert item_buffer.items == [item, update_item]
+ assert request_buffer.requests == []
+ assert result.del_request_redis_after_item_to_db is True
+
+
+def test_dispatcher_routes_callable_after_item_to_item_buffer():
+ request_buffer = FakeRequestBuffer()
+ item_buffer = FakeItemBuffer()
+ dispatcher = ResultDispatcher(
+ request_buffer=request_buffer,
+ item_buffer=item_buffer,
+ deal_request=lambda request: None,
+ )
+ callback = lambda: None
+
+ result = dispatcher.dispatch(
+ parser=FakeParser(),
+ request=Request("https://root.example.com"),
+ results=[Item(title="one"), callback],
+ )
+
+ assert item_buffer.items[-1] is callback
+ assert request_buffer.requests == []
+ assert result.del_request_redis_after_item_to_db is True
+
+
+def test_dispatcher_routes_callable_without_prior_item_to_request_buffer():
+ request_buffer = FakeRequestBuffer()
+ item_buffer = FakeItemBuffer()
+ dispatcher = ResultDispatcher(
+ request_buffer=request_buffer,
+ item_buffer=item_buffer,
+ deal_request=lambda request: None,
+ )
+ callback = lambda: None
+
+ result = dispatcher.dispatch(
+ parser=FakeParser(),
+ request=Request("https://root.example.com"),
+ results=[callback],
+ )
+
+ assert request_buffer.requests == [callback]
+ assert item_buffer.items == []
+ assert result.del_request_redis_after_request_to_db is True
+
+
+def test_dispatcher_rejects_callable_when_disabled():
+ dispatcher = ResultDispatcher(
+ request_buffer=FakeRequestBuffer(),
+ item_buffer=FakeItemBuffer(),
+ deal_request=lambda request: None,
+ allow_callable=False,
+ )
+
+ with pytest.raises(TypeError, match="FakeParser.parse result expect Request or Item"):
+ dispatcher.dispatch(
+ parser=FakeParser(),
+ request=Request("https://root.example.com"),
+ results=[lambda: None],
+ )
+
+
+def test_dispatcher_rejects_invalid_result_type():
+ dispatcher = ResultDispatcher(
+ request_buffer=FakeRequestBuffer(),
+ item_buffer=FakeItemBuffer(),
+ deal_request=lambda request: None,
+ )
+
+ with pytest.raises(TypeError, match="FakeParser.parse result expect Request"):
+ dispatcher.dispatch(
+ parser=FakeParser(),
+ request=Request("https://root.example.com"),
+ results=[object()],
+ )
diff --git a/tests/test_runtime_state.py b/tests/test_runtime_state.py
new file mode 100644
index 00000000..0c8a9bc0
--- /dev/null
+++ b/tests/test_runtime_state.py
@@ -0,0 +1,56 @@
+from feapder.core.runtime_state import RuntimeState
+
+
+def test_runtime_state_starts_idle_and_running():
+ state = RuntimeState()
+
+ assert state.is_idle is True
+ assert state.is_stop_requested is False
+ assert state.busy_count == 0
+
+
+def test_runtime_state_tracks_busy_count():
+ state = RuntimeState()
+
+ state.mark_busy()
+ state.mark_busy()
+
+ assert state.is_idle is False
+ assert state.busy_count == 2
+
+ state.mark_idle()
+ state.mark_idle()
+
+ assert state.is_idle is True
+ assert state.busy_count == 0
+
+
+def test_runtime_state_does_not_go_negative():
+ state = RuntimeState()
+
+ state.mark_idle()
+
+ assert state.busy_count == 0
+ assert state.is_idle is True
+
+
+def test_runtime_state_busy_context_resets_on_exception():
+ state = RuntimeState()
+
+ try:
+ with state.busy():
+ assert state.is_idle is False
+ raise RuntimeError("boom")
+ except RuntimeError:
+ pass
+
+ assert state.is_idle is True
+
+
+def test_runtime_state_stop_request_is_sticky():
+ state = RuntimeState()
+
+ state.request_stop()
+ state.request_stop()
+
+ assert state.is_stop_requested is True
diff --git a/tests/test_runtime_status.py b/tests/test_runtime_status.py
new file mode 100644
index 00000000..c0701405
--- /dev/null
+++ b/tests/test_runtime_status.py
@@ -0,0 +1,240 @@
+import collections
+from queue import Queue
+
+from feapder.buffer.item_buffer import ItemBuffer
+from feapder.buffer.request_buffer import RequestBuffer
+from feapder.core.collector import Collector
+from feapder.core.scheduler import Scheduler
+from feapder.core.runtime_state import RuntimeState
+from feapder.network.item import Item
+
+
+class FakeRedis:
+ def __init__(self, count=0):
+ self.count = count
+
+ def zget_count(self, table):
+ return self.count
+
+
+class FakeDoneComponent:
+ def __init__(self, idle=True):
+ self._idle = idle
+
+ def is_idle(self):
+ return self._idle
+
+
+def build_collector(local_count=0, backend_count=0, busy=False):
+ collector = object.__new__(Collector)
+ collector._state = RuntimeState()
+ collector._db = FakeRedis(backend_count)
+ collector._tab_requests = "test:z_requests"
+ collector._todo_requests = Queue()
+ collector._is_collector_task = busy
+ for i in range(local_count):
+ collector._todo_requests.put({"request_obj": i, "request_redis": str(i)})
+ return collector
+
+
+def build_request_buffer(request_count=0, delete_count=0, flushing=False):
+ buffer = object.__new__(RequestBuffer)
+ buffer._state = RuntimeState()
+ buffer._requests_deque = collections.deque(range(request_count))
+ buffer._del_requests_deque = collections.deque(range(delete_count))
+ buffer._is_adding_to_db = flushing
+ return buffer
+
+
+def build_item_buffer(item_count=0, flushing=False):
+ buffer = object.__new__(ItemBuffer)
+ buffer._state = RuntimeState()
+ buffer._items_queue = Queue()
+ buffer._is_adding_to_db = flushing
+ for i in range(item_count):
+ buffer._items_queue.put(i)
+ return buffer
+
+
+def test_collector_pending_count_includes_local_queue_first():
+ collector = build_collector(local_count=2, backend_count=5)
+
+ assert collector.pending_count() == 2
+ assert collector.is_idle() is False
+
+
+def test_collector_pending_count_uses_backend_when_local_empty():
+ collector = build_collector(local_count=0, backend_count=3)
+
+ assert collector.pending_count() == 3
+ assert collector.is_idle() is False
+
+
+def test_collector_idle_when_not_collecting_and_empty():
+ collector = build_collector(local_count=0, backend_count=0, busy=False)
+
+ assert collector.pending_count() == 0
+ assert collector.is_idle() is True
+
+
+def test_request_buffer_pending_count_includes_writes_and_deletes():
+ buffer = build_request_buffer(request_count=2, delete_count=1)
+
+ assert buffer.pending_count() == 3
+ assert buffer.is_idle() is False
+
+
+def test_request_buffer_idle_when_empty_and_not_flushing():
+ buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False)
+
+ assert buffer.pending_count() == 0
+ assert buffer.is_idle() is True
+
+
+def test_item_buffer_idle_when_queue_empty_and_not_flushing():
+ buffer = build_item_buffer(item_count=0, flushing=False)
+
+ assert buffer.pending_count() == 0
+ assert buffer.is_idle() is True
+
+
+def test_item_buffer_not_idle_while_flushing():
+ buffer = build_item_buffer(item_count=0, flushing=True)
+
+ assert buffer.is_idle() is False
+
+
+def test_scheduler_uses_component_idle_methods(monkeypatch):
+ monkeypatch.setattr("feapder.core.scheduler.tools.delay_time", lambda seconds: None)
+ scheduler = object.__new__(Scheduler)
+ scheduler._collector = FakeDoneComponent(idle=True)
+ scheduler._parser_controls = [FakeDoneComponent(idle=True)]
+ scheduler._item_buffer = FakeDoneComponent(idle=True)
+ scheduler._request_buffer = FakeDoneComponent(idle=True)
+
+ assert scheduler.all_thread_is_done() is True
+
+
+def test_scheduler_waits_for_busy_parser(monkeypatch):
+ monkeypatch.setattr("feapder.core.scheduler.tools.delay_time", lambda seconds: None)
+ scheduler = object.__new__(Scheduler)
+ scheduler._collector = FakeDoneComponent(idle=True)
+ scheduler._parser_controls = [FakeDoneComponent(idle=False)]
+ scheduler._item_buffer = FakeDoneComponent(idle=True)
+ scheduler._request_buffer = FakeDoneComponent(idle=True)
+
+ assert scheduler.all_thread_is_done() is False
+
+
+def test_component_run_resets_stop_state_before_loop(monkeypatch):
+ collector = build_collector(local_count=0, backend_count=0)
+ collector._state.request_stop()
+ calls = {"count": 0}
+
+ def stop_after_one_input():
+ calls["count"] += 1
+ collector._state.request_stop()
+
+ monkeypatch.setattr(collector, "_Collector__input_data", stop_after_one_input)
+
+ collector.run()
+
+ assert calls["count"] == 1
+ assert collector.is_stopped() is True
+
+
+def test_request_buffer_not_idle_during_delete_only_flush():
+ class InspectingDB:
+ def __init__(self, buffer):
+ self.buffer = buffer
+ self.idle_during_zrem = None
+
+ def zrem(self, table, values):
+ self.idle_during_zrem = self.buffer.is_idle()
+
+ buffer = build_request_buffer(request_count=0, delete_count=1, flushing=False)
+ buffer._table_request = "test:z_requests"
+ buffer._db = InspectingDB(buffer)
+
+ buffer.flush()
+
+ assert buffer._db.idle_during_zrem is False
+ assert buffer.is_idle() is True
+
+
+def test_request_buffer_marks_noop_flush_attempt_busy_while_checking_queue():
+ class InspectingEmptyDeque:
+ def __init__(self, buffer):
+ self.buffer = buffer
+ self.busy_observations = []
+
+ def __bool__(self):
+ self.busy_observations.append(self.buffer.is_adding_to_db())
+ return False
+
+ def __len__(self):
+ return 0
+
+ def popleft(self):
+ raise AssertionError("empty queue should not be popped")
+
+ buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False)
+ buffer._requests_deque = InspectingEmptyDeque(buffer)
+
+ buffer.flush()
+
+ assert buffer._requests_deque.busy_observations
+ assert all(buffer._requests_deque.busy_observations)
+ assert buffer.is_idle() is True
+
+
+def test_request_buffer_flush_resets_flag_when_zadd_raises():
+ class ExplodingDB:
+ def zadd(self, table, values, prioritys=0):
+ raise RuntimeError("write failed")
+
+ request = type(
+ "FakeRequest",
+ (),
+ {
+ "priority": 300,
+ "filter_repeat": False,
+ "url": "https://example.com",
+ "to_dict": {"url": "https://example.com"},
+ },
+ )()
+ buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False)
+ buffer._db = ExplodingDB()
+ buffer._table_request = "test:z_requests"
+ buffer._table_failed_request = "test:z_failed_requests"
+ buffer._requests_deque.append(request)
+
+ buffer.flush()
+
+ assert buffer.is_adding_to_db() is False
+
+
+def test_item_buffer_flush_resets_flag_when_export_raises(monkeypatch):
+ monkeypatch.setattr("feapder.buffer.item_buffer.setting.ITEM_FILTER_ENABLE", False)
+ item = Item(title="boom")
+ buffer = build_item_buffer(item_count=0, flushing=False)
+ buffer._items_queue.put(item)
+ buffer._redis_key = "test"
+ buffer._task_table = None
+ buffer._item_tables = {}
+ buffer._item_update_keys = {}
+ buffer._item_pipelines = {}
+ buffer._pipelines = []
+ buffer._have_mysql_pipeline = True
+ buffer._mysql_pipeline = None
+ buffer.export_retry_times = 0
+ buffer.export_falied_times = 0
+
+ def raise_export(table, datas, is_update=False, update_keys=(), used_pipelines=None):
+ raise RuntimeError("export failed")
+
+ buffer._ItemBuffer__export_to_db = raise_export
+
+ buffer.flush()
+
+ assert buffer.is_adding_to_db() is False