diff --git a/.gitignore b/.gitignore index fedead23..721f067a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ files/* +docs/superpowers/ .DS_Store .idea/* */.idea/* @@ -15,4 +16,4 @@ dist/ media/ .MWebMetaData/ push.sh -assets/ \ No newline at end of file +assets/ diff --git a/README.md b/README.md index 7bde6250..f13682fc 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,7 @@ FirstSpider|2021-02-09 14:55:14,620|air_spider.py|run|line:80|INFO| 无任务, ### Rapidproxy代理 - + @@ -133,7 +133,7 @@ FirstSpider|2021-02-09 14:55:14,620|air_spider.py|run|line:80|INFO| 无任务, ### NovProxy - +g diff --git a/docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md b/docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md new file mode 100644 index 00000000..08b34b6f --- /dev/null +++ b/docs/superpowers/specs/2026-05-31-runtime-core-refactor-design.md @@ -0,0 +1,214 @@ +# Core Runtime Refactor Design + +Date: 2026-05-31 + +## Scope + +This refactor focuses on the runtime path that moves work through feapder: + +- `Scheduler` +- `Collector` +- `ParserControl` +- `RequestBuffer` +- `ItemBuffer` + +The first implementation phase keeps the public spider APIs compatible. It does not change how users subclass `AirSpider`, `Spider`, `TaskSpider`, or `BatchSpider`, and it does not redesign their inheritance yet. The spider inheritance cleanup remains a follow-up phase after the runtime path is safer and better covered by tests. + +## Current Problems + +The runtime path works, but several responsibilities are tangled: + +- Thread lifecycle state is duplicated. Multiple classes maintain `_thread_stop`, `_is_adding_to_db`, `is_show_tip`, queue checks, and direct `_started.clear()` calls in slightly different ways. +- `ParserControl.deal_request` is too large. It handles parser lookup, download middleware, response fetching, validation, callback execution, result routing, retry, failed request persistence, metrics, browser release, and request deletion in one method. +- `ParserControl` and `AirSpiderParserControl` duplicate most request handling logic while differing mainly in queue backend and completion semantics. +- `Scheduler.all_thread_is_done()` depends on implementation details from collector and buffers. This makes shutdown sensitive to timing and temporary state. +- `RequestBuffer` and `ItemBuffer` both combine queueing, flushing, retry callbacks, persistence, and thread control. Their idle states are not explicit enough for a scheduler to reason about safely. +- Result handling rules are repeated in several places. A yielded value can be `Request`, `Item`, `UpdateItem`, callable, or invalid, but the classification logic is not centralized. +- Data safety concerns such as `eval`-based deserialization exist in adjacent runtime code. Replacing serialization is valuable, but it is not part of the first implementation slice unless touched code needs a small compatibility wrapper. + +## Goals + +1. Make runtime thread state explicit and consistent. +2. Split request processing into small units with clear responsibilities. +3. Reduce duplication between Redis-backed and in-memory parser control paths. +4. Make scheduler shutdown decisions depend on a small runtime status interface instead of scattered implementation details. +5. Preserve existing user-facing behavior, settings, Redis key formats, retry semantics, and callback semantics. +6. Add lightweight tests that do not require Redis, MySQL, browsers, or network access. + +## Non-Goals + +- Do not change the public constructor signatures of `Scheduler`, `AirSpider`, `Spider`, `TaskSpider`, or `BatchSpider`. +- Do not rewrite the storage format for requests, responses, failed items, or failed requests in this phase. +- Do not require Redis or MySQL for the new unit tests. +- Do not remove existing settings or documented behavior. +- Do not perform the spider inheritance refactor in this phase. + +## Proposed Architecture + +### Runtime Worker State + +Introduce a small shared lifecycle helper for runtime threads. The helper should expose: + +- `request_stop()` +- `is_stop_requested` +- `mark_busy()` +- `mark_idle()` +- `is_idle` + +`Collector`, `ParserControl`, `RequestBuffer`, and `ItemBuffer` can keep inheriting from `threading.Thread`, but their public state checks should move toward explicit methods: + +- `is_idle()` +- `pending_count()` +- `is_stopped()` where useful + +This allows `Scheduler` to ask each component for status instead of reading several unrelated flags and queues. + +### Runtime Status Interface + +Add a small internal status contract used by `Scheduler.all_thread_is_done()`: + +- Collector is done when it is idle and has no pending local or backend requests. +- Parser controls are done when each worker is idle. +- Request buffer is done when it has no queued writes, no queued deletions, and is not flushing. +- Item buffer is done when it has no queued items and is not flushing. + +The scheduler should keep the current three-pass stability check, but delegate each component check to the component itself. + +### Parser Request Processing + +Split `ParserControl.deal_request` into private units: + +- `_find_parser(request)` +- `_prepare_response(parser, request)` +- `_run_callback(parser, request, response)` +- `_dispatch_results(parser, request, results, request_redis)` +- `_handle_exception(parser, request, response, error, request_redis, used_download_midware)` +- `_finish_request(request_redis, finish_action)` +- `_sleep_after_request()` + +The method `deal_request` remains as the public entry point for compatibility, but becomes orchestration code. This makes retry and success paths easier to test independently. + +### Result Dispatcher + +Introduce an internal result dispatcher for `Request`, `Item`, and callable values. It should encode the current behavior: + +- `Request` values receive a default `parser_name`. +- Synchronous requests are processed immediately. +- Asynchronous requests go to `RequestBuffer`. +- `Item` and `UpdateItem` values go to `ItemBuffer`. +- Callable values follow the current "previous result type" rule: after an item they are item callbacks; otherwise they are request callbacks. +- Invalid non-`None` values raise `TypeError` with the parser and callback name. + +For the first phase, wire this dispatcher into `ParserControl`. Later phases can reuse it from `Scheduler`, `Spider`, `TaskSpider`, and `BatchSpider`. + +### ParserControl Variants + +Keep both `ParserControl` and `AirSpiderParserControl` classes for compatibility, but move shared behavior into a base implementation. The variants should differ only in: + +- How they receive work. +- How they mark a request complete. +- Whether failed requests are persisted to Redis. + +This avoids changing `AirSpider` behavior while removing duplicated parsing, retry, middleware, and result handling logic. + +### Buffer Semantics + +`RequestBuffer` should make these states explicit: + +- queued new requests +- queued deletion requests +- currently flushing + +`ItemBuffer` should make these states explicit: + +- queued items or callbacks +- currently flushing +- export retry counters + +Both buffers should keep `flush()` as a synchronous method. Their thread loop continues calling `flush()` periodically. `stop()` should request shutdown, and the scheduler should still call explicit flushing before treating the runtime as complete. + +### Error Handling + +The refactor must preserve current error behavior: + +- Request download exceptions continue to increment download exception metrics. +- Parser exceptions continue to increment parser exception metrics. +- Proxy deletion behavior remains based on `PROXY_MAX_FAILED_TIMES`. +- `exception_request` and `failed_request` compatibility hooks remain intact. +- Browser instances are returned to the render downloader in a `finally` block. +- The original request is preserved when download middleware mutates a request and a retry or failed-request persistence is needed. + +Error handling should become easier to read by returning a small internal result from `_handle_exception`, such as whether to delete the active Redis request via the request buffer or via the item buffer. + +## Data Flow + +1. `Scheduler` starts buffers, collector, parser workers, retry handlers, and optional start request distribution. +2. `Collector` atomically reserves ready Redis requests and exposes them through its local queue. +3. `ParserControl` obtains one request, marks itself busy, finds the parser, downloads or reuses a response, validates it, runs the callback, and dispatches yielded results. +4. `RequestBuffer` persists new requests and deletion markers. +5. `ItemBuffer` persists items and deletes completed Redis requests after item persistence succeeds. +6. `Scheduler` polls component status. It ends the spider only after all components report stable idle status. + +For `AirSpider`, the same parser-processing flow applies, but the request source is `MemoryDB` and request completion does not use Redis deletion markers. + +## Compatibility Strategy + +- Keep class names and imports stable. +- Keep public method names such as `deal_request`, `flush`, `stop`, `is_not_task`, `get_requests_count`, and `is_adding_to_db`. +- Add new internal methods instead of removing old methods immediately. +- Preserve existing Redis key names and serialized values. +- Preserve callback ordering and the current callable routing rule. +- Preserve warning, logging, and metric names unless a test proves an existing message is wrong. + +## Testing Strategy + +Add focused unit tests around the new internal boundaries: + +- Collector idle and pending status with a fake RedisDB. +- RequestBuffer status before enqueue, during flush, and after flush with a fake DB. +- ItemBuffer status before enqueue, during flush, and after successful fake pipeline export. +- ParserControl result dispatch for `Request`, synchronous `Request`, `Item`, callback, and invalid result. +- ParserControl retry behavior for a controlled exception without network access. +- Scheduler completion check using fake components that transition from busy to idle. + +Existing integration tests can remain as broader coverage. This phase should not require Redis, MySQL, Playwright, Selenium, or live HTTP. + +## Implementation Phases + +### Phase 1: Status Surfaces + +Add explicit idle and pending methods to collector and buffers. Update `Scheduler.all_thread_is_done()` to use these methods while preserving the three-pass stability check. + +### Phase 2: ParserControl Extraction + +Split `ParserControl.deal_request` into smaller private methods without changing behavior. Add tests for the extracted methods using fake parsers, fake buffers, and fake requests. + +### Phase 3: Shared Parser Runtime + +Move duplicated logic from `ParserControl` and `AirSpiderParserControl` into shared helpers or a shared base class. Keep the concrete classes as compatibility wrappers. + +### Phase 4: Runtime Result Dispatcher + +Introduce the internal dispatcher and wire it into parser controls. Keep legacy result behavior intact. + +### Phase 5: Buffer Stop and Flush Hardening + +Make stop behavior explicit. Ensure a stopped buffer can report pending work accurately and that final flushes are deterministic. + +## Acceptance Criteria + +- Public spider examples in the docs continue to run with the same API. +- `Scheduler.all_thread_is_done()` no longer reads buffer and parser internals directly when an explicit status method exists. +- `ParserControl.deal_request` is substantially shorter and delegates download, callback, dispatch, exception, and completion responsibilities. +- `ParserControl` and `AirSpiderParserControl` share request-processing behavior instead of duplicating it. +- New lightweight tests cover the status and dispatch behavior without external services. +- Existing compatible tests still pass in the available local environment. + +## Follow-Up Work + +After this runtime refactor, a separate design should address the spider type hierarchy. The likely direction is a shared `BaseSpider` plus a distributed branch: + +- `BaseSpider -> AirSpider` +- `BaseSpider -> Spider -> TaskSpider -> BatchSpider` + +That work should wait until the runtime path has clearer component boundaries and tests, because changing inheritance before stabilizing runtime behavior would make regressions harder to isolate. diff --git a/feapder/buffer/item_buffer.py b/feapder/buffer/item_buffer.py index 35f9bb01..80e21826 100644 --- a/feapder/buffer/item_buffer.py +++ b/feapder/buffer/item_buffer.py @@ -14,6 +14,7 @@ import feapder.utils.tools as tools from feapder import setting from feapder.db.redisdb import RedisDB +from feapder.core.runtime_state import RuntimeState from feapder.dedup import Dedup from feapder.network.item import Item, UpdateItem from feapder.pipelines import BasePipeline @@ -31,6 +32,7 @@ class ItemBuffer(threading.Thread): def __init__(self, redis_key, task_table=None): if not hasattr(self, "_table_item"): super(ItemBuffer, self).__init__() + self._state = RuntimeState() self._thread_stop = False self._is_adding_to_db = False @@ -106,7 +108,8 @@ def mysql_pipeline(self): def run(self): self._thread_stop = False - while not self._thread_stop: + self._state = RuntimeState() + while not self._state.is_stop_requested: self.flush() tools.delay_time(setting.ITEM_UPLOAD_INTERVAL) @@ -114,6 +117,7 @@ def run(self): def stop(self): self._thread_stop = True + self._state.request_stop() self._started.clear() def put_item(self, item): @@ -174,6 +178,15 @@ def flush(self): def get_items_count(self): return self._items_queue.qsize() + def pending_count(self): + return self.get_items_count() + + def is_idle(self): + return self.pending_count() == 0 and not self.is_adding_to_db() + + def is_stopped(self): + return self._state.is_stop_requested + def is_adding_to_db(self): return self._is_adding_to_db @@ -286,133 +299,134 @@ def __export_to_db(self, table, datas, is_update=False, update_keys=(), used_pip def __add_item_to_db( self, items, update_items, requests, callbacks, items_fingerprints ): - export_success = True self._is_adding_to_db = True + try: + export_success = True - # 去重 - if setting.ITEM_FILTER_ENABLE: - items, items_fingerprints = self.__dedup_items(items, items_fingerprints) + # 去重 + if setting.ITEM_FILTER_ENABLE: + items, items_fingerprints = self.__dedup_items(items, items_fingerprints) - # 分捡(返回值包含 pipelines_dict) - items_dict = self.__pick_items(items) - update_items_dict = self.__pick_items(update_items, is_update_item=True) + # 分捡(返回值包含 pipelines_dict) + items_dict = self.__pick_items(items) + update_items_dict = self.__pick_items(update_items, is_update_item=True) - # item批量入库 - failed_items = {"add": [], "update": [], "requests": []} - while items_dict: - table, datas = items_dict.popitem() - used_pipelines = self._item_pipelines.get(table) + # item批量入库 + failed_items = {"add": [], "update": [], "requests": []} + while items_dict: + table, datas = items_dict.popitem() + used_pipelines = self._item_pipelines.get(table) - log.debug( - """ + log.debug( + """ -------------- item 批量入库 -------------- 表名: %s datas: %s """ - % (table, tools.dumps_json(datas, indent=16)) - ) + % (table, tools.dumps_json(datas, indent=16)) + ) - if not self.__export_to_db(table, datas, used_pipelines=used_pipelines): - export_success = False - failed_items["add"].append({"table": table, "datas": datas}) + if not self.__export_to_db(table, datas, used_pipelines=used_pipelines): + export_success = False + failed_items["add"].append({"table": table, "datas": datas}) - # 执行批量update - while update_items_dict: - table, datas = update_items_dict.popitem() - used_pipelines = self._item_pipelines.get(table) + # 执行批量update + while update_items_dict: + table, datas = update_items_dict.popitem() + used_pipelines = self._item_pipelines.get(table) - log.debug( - """ + log.debug( + """ -------------- item 批量更新 -------------- 表名: %s datas: %s """ - % (table, tools.dumps_json(datas, indent=16)) - ) - - update_keys = self._item_update_keys.get(table) - if not self.__export_to_db( - table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines - ): - export_success = False - failed_items["update"].append( - {"table": table, "datas": datas, "update_keys": update_keys} + % (table, tools.dumps_json(datas, indent=16)) ) - if export_success: - # 执行回调 - while callbacks: - try: - callback = callbacks.pop(0) - callback() - except Exception as e: - log.exception(e) + update_keys = self._item_update_keys.get(table) + if not self.__export_to_db( + table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines + ): + export_success = False + failed_items["update"].append( + {"table": table, "datas": datas, "update_keys": update_keys} + ) + + if export_success: + # 执行回调 + while callbacks: + try: + callback = callbacks.pop(0) + callback() + except Exception as e: + log.exception(e) - # 删除做过的request - if requests: - self.redis_db.zrem(self._table_request, requests) + # 删除做过的request + if requests: + self.redis_db.zrem(self._table_request, requests) - # 去重入库 - if setting.ITEM_FILTER_ENABLE: - if items_fingerprints: - self.__class__.dedup.add(items_fingerprints, skip_check=True) - else: - failed_items["requests"] = requests + # 去重入库 + if setting.ITEM_FILTER_ENABLE: + if items_fingerprints: + self.__class__.dedup.add(items_fingerprints, skip_check=True) + else: + failed_items["requests"] = requests - if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES: - if self._redis_key != "air_spider": - # 失败的item记录到redis - self.redis_db.sadd(self._table_failed_items, failed_items) + if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES: + if self._redis_key != "air_spider": + # 失败的item记录到redis + self.redis_db.sadd(self._table_failed_items, failed_items) - # 删除做过的request - if requests: - self.redis_db.zrem(self._table_request, requests) + # 删除做过的request + if requests: + self.redis_db.zrem(self._table_request, requests) - log.error( - "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format( - tools.dumps_json(failed_items) + log.error( + "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format( + tools.dumps_json(failed_items) + ) ) - ) - self.export_retry_times = 0 + self.export_retry_times = 0 - else: - tip = ["入库不成功"] - if callbacks: - tip.append("不执行回调") - if requests: - tip.append("不删除任务") - exists = self.redis_db.zexists(self._table_request, requests) - for exist, request in zip(exists, requests): - if exist: - self.redis_db.zadd(self._table_request, requests, 300) - - if setting.ITEM_FILTER_ENABLE: - tip.append("数据不入去重库") + else: + tip = ["入库不成功"] + if callbacks: + tip.append("不执行回调") + if requests: + tip.append("不删除任务") + exists = self.redis_db.zexists(self._table_request, requests) + for exist, request in zip(exists, requests): + if exist: + self.redis_db.zadd(self._table_request, requests, 300) - if self._redis_key != "air_spider": - tip.append("将自动重试") + if setting.ITEM_FILTER_ENABLE: + tip.append("数据不入去重库") - tip.append("失败items:\n {}".format(tools.dumps_json(failed_items))) - log.error(",".join(tip)) + if self._redis_key != "air_spider": + tip.append("将自动重试") - self.export_falied_times += 1 + tip.append("失败items:\n {}".format(tools.dumps_json(failed_items))) + log.error(",".join(tip)) - if self._redis_key != "air_spider": - self.export_retry_times += 1 + self.export_falied_times += 1 - if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES: - # 报警 - msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format( - self._redis_key, self.export_falied_times - ) - log.error(msg) - tools.send_msg( - msg=msg, - level="error", - message_prefix="《%s》爬虫导出数据失败" % (self._redis_key), - ) + if self._redis_key != "air_spider": + self.export_retry_times += 1 - self._is_adding_to_db = False + if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES: + # 报警 + msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format( + self._redis_key, self.export_falied_times + ) + log.error(msg) + tools.send_msg( + msg=msg, + level="error", + message_prefix="《%s》爬虫导出数据失败" % (self._redis_key), + ) + finally: + self._is_adding_to_db = False def metric_datas(self, table, datas): """ diff --git a/feapder/buffer/request_buffer.py b/feapder/buffer/request_buffer.py index 70677a94..f0c7be17 100644 --- a/feapder/buffer/request_buffer.py +++ b/feapder/buffer/request_buffer.py @@ -15,6 +15,7 @@ import feapder.utils.tools as tools from feapder.db.memorydb import MemoryDB from feapder.db.redisdb import RedisDB +from feapder.core.runtime_state import RuntimeState from feapder.dedup import Dedup from feapder.utils.log import log @@ -60,6 +61,7 @@ class RequestBuffer(AirSpiderRequestBuffer, threading.Thread): def __init__(self, redis_key): AirSpiderRequestBuffer.__init__(self, db=RedisDB(), dedup_name=redis_key) threading.Thread.__init__(self) + self._state = RuntimeState() self._thread_stop = False self._is_adding_to_db = False @@ -74,7 +76,8 @@ def __init__(self, redis_key): def run(self): self._thread_stop = False - while not self._thread_stop: + self._state = RuntimeState() + while not self._state.is_stop_requested: try: self.__add_request_to_db() except Exception as e: @@ -84,6 +87,7 @@ def run(self): def stop(self): self._thread_stop = True + self._state.request_stop() self._started.clear() def put_request(self, request): @@ -113,62 +117,74 @@ def flush(self): def get_requests_count(self): return len(self._requests_deque) + def get_delete_requests_count(self): + return len(self._del_requests_deque) + + def pending_count(self): + return self.get_requests_count() + self.get_delete_requests_count() + + def is_idle(self): + return self.pending_count() == 0 and not self.is_adding_to_db() + + def is_stopped(self): + return self._state.is_stop_requested + def is_adding_to_db(self): return self._is_adding_to_db def __add_request_to_db(self): + self._is_adding_to_db = True request_list = [] prioritys = [] callbacks = [] - - while self._requests_deque: - request = self._requests_deque.popleft() - self._is_adding_to_db = True - - if callable(request): - # 函数 - # 注意:应该考虑闭包情况。闭包情况可写成 - # def test(xxx = xxx): - # # TODO 业务逻辑 使用 xxx - # 这么写不会导致xxx为循环结束后的最后一个值 - callbacks.append(request) - continue - - priority = request.priority - - # 如果需要去重并且库中已重复 则continue - if self.is_exist_request(request): - continue - else: - request_list.append(str(request.to_dict)) - prioritys.append(priority) - - if len(request_list) > MAX_URL_COUNT: + try: + while self._requests_deque: + request = self._requests_deque.popleft() + + if callable(request): + # 函数 + # 注意:应该考虑闭包情况。闭包情况可写成 + # def test(xxx = xxx): + # # TODO 业务逻辑 使用 xxx + # 这么写不会导致xxx为循环结束后的最后一个值 + callbacks.append(request) + continue + + priority = request.priority + + # 如果需要去重并且库中已重复 则continue + if self.is_exist_request(request): + continue + else: + request_list.append(str(request.to_dict)) + prioritys.append(priority) + + if len(request_list) > MAX_URL_COUNT: + self._db.zadd(self._table_request, request_list, prioritys) + request_list = [] + prioritys = [] + + # 入库 + if request_list: self._db.zadd(self._table_request, request_list, prioritys) - request_list = [] - prioritys = [] - - # 入库 - if request_list: - self._db.zadd(self._table_request, request_list, prioritys) - - # 执行回调 - for callback in callbacks: - try: - callback() - except Exception as e: - log.exception(e) - - # 删除已做任务 - if self._del_requests_deque: - request_done_list = [] - while self._del_requests_deque: - request_done_list.append(self._del_requests_deque.popleft()) - - # 去掉request_list中的requests, 否则可能会将刚添加的request删除 - request_done_list = list(set(request_done_list) - set(request_list)) - if request_done_list: - self._db.zrem(self._table_request, request_done_list) - - self._is_adding_to_db = False + # 执行回调 + for callback in callbacks: + try: + callback() + except Exception as e: + log.exception(e) + + # 删除已做任务 + if self._del_requests_deque: + request_done_list = [] + while self._del_requests_deque: + request_done_list.append(self._del_requests_deque.popleft()) + + # 去掉request_list中的requests, 否则可能会将刚添加的request删除 + request_done_list = list(set(request_done_list) - set(request_list)) + + if request_done_list: + self._db.zrem(self._table_request, request_done_list) + finally: + self._is_adding_to_db = False diff --git a/feapder/core/collector.py b/feapder/core/collector.py index 5b8ff652..2f9f2822 100644 --- a/feapder/core/collector.py +++ b/feapder/core/collector.py @@ -15,6 +15,7 @@ import feapder.setting as setting import feapder.utils.tools as tools from feapder.db.redisdb import RedisDB +from feapder.core.runtime_state import RuntimeState from feapder.network.request import Request from feapder.utils.log import log @@ -30,6 +31,7 @@ def __init__(self, redis_key): """ super(Collector, self).__init__() + self._state = RuntimeState() self._db = RedisDB() self._thread_stop = False @@ -40,9 +42,11 @@ def __init__(self, redis_key): def run(self): self._thread_stop = False - while not self._thread_stop: + self._state = RuntimeState() + while not self._state.is_stop_requested: try: - self.__input_data() + with self._state.busy(): + self.__input_data() except Exception as e: log.exception(e) time.sleep(0.1) @@ -51,6 +55,7 @@ def run(self): def stop(self): self._thread_stop = True + self._state.request_stop() self._started.clear() def __input_data(self): @@ -112,5 +117,14 @@ def get_requests_count(self): self._todo_requests.qsize() or self._db.zget_count(self._tab_requests) or 0 ) + def pending_count(self): + return self.get_requests_count() + + def is_idle(self): + return not self.is_collector_task() and self.pending_count() == 0 + + def is_stopped(self): + return self._state.is_stop_requested + def is_collector_task(self): return self._is_collector_task diff --git a/feapder/core/parser_control.py b/feapder/core/parser_control.py index 021d2956..646cb2a8 100644 --- a/feapder/core/parser_control.py +++ b/feapder/core/parser_control.py @@ -18,6 +18,8 @@ from feapder.buffer.item_buffer import ItemBuffer from feapder.buffer.request_buffer import AirSpiderRequestBuffer from feapder.core.base_parser import BaseParser +from feapder.core.result_dispatcher import ResultDispatcher +from feapder.core.runtime_state import RuntimeState from feapder.db.memorydb import MemoryDB from feapder.network.item import Item from feapder.network.request import Request @@ -42,6 +44,7 @@ class ParserControl(threading.Thread): def __init__(self, collector, redis_key, request_buffer, item_buffer): super(ParserControl, self).__init__() + self._state = RuntimeState() self._parsers = [] self._collector = collector self._redis_key = redis_key @@ -52,7 +55,8 @@ def __init__(self, collector, redis_key, request_buffer, item_buffer): def run(self): self._thread_stop = False - while not self._thread_stop: + self._state = RuntimeState() + while not self._state.is_stop_requested: try: request = self._collector.get_request() if not request: @@ -62,7 +66,8 @@ def run(self): continue self.is_show_tip = False - self.deal_request(request) + with self._state.busy(): + self.deal_request(request) except Exception as e: log.exception(e) @@ -70,345 +75,141 @@ def run(self): def is_not_task(self): return self.is_show_tip + def is_idle(self): + return self.is_not_task() and self._state.is_idle + + def is_stopped(self): + return self._state.is_stop_requested + @classmethod def get_task_status_count(cls): return cls._failed_task_count, cls._success_task_count, cls._total_task_count - def deal_request(self, request): - response = None - request_redis = request["request_redis"] - request = request["request_obj"] - - del_request_redis_after_item_to_db = False - del_request_redis_after_request_to_db = False + def _make_sync_request_payload(self, request): + return {"request_obj": request, "request_redis": None} + def _find_parser(self, request): for parser in self._parsers: if parser.name == request.parser_name: - used_download_midware_enable = False - try: - self.__class__._total_task_count += 1 - # 记录需下载的文档 - self.record_download_status( - ParserControl.DOWNLOAD_TOTAL, parser.name - ) - - # 解析request - if request.auto_request: - request_temp = None - response = None - - # 下载中间件 - if request.download_midware: - if isinstance(request.download_midware, (list, tuple)): - request_temp = request - for download_midware in request.download_midware: - download_midware = ( - download_midware - if callable(download_midware) - else tools.get_method(parser, download_midware) - ) - request_temp = download_midware(request_temp) - else: - download_midware = ( - request.download_midware - if callable(request.download_midware) - else tools.get_method( - parser, request.download_midware - ) - ) - request_temp = download_midware(request) - elif request.download_midware != False: - request_temp = parser.download_midware(request) - - # 请求 - if request_temp: - if ( - isinstance(request_temp, (tuple, list)) - and len(request_temp) == 2 - ): - request_temp, response = request_temp - - if not isinstance(request_temp, Request): - raise Exception( - "download_midware need return a request, but received type: {}".format( - type(request_temp) - ) - ) - used_download_midware_enable = True - if response is None: - response = ( - request_temp.get_response() - if not setting.RESPONSE_CACHED_USED - else request_temp.get_response_from_cached( - save_cached=False - ) - ) - else: - response = ( - request.get_response() - if not setting.RESPONSE_CACHED_USED - else request.get_response_from_cached(save_cached=False) - ) - - if response == None: - raise Exception( - "连接超时 url: %s" % (request.url or request_temp.url) - ) - - # 校验 - if parser.validate(request, response) == False: - break - - else: - response = None - - if request.callback: # 如果有parser的回调函数,则用回调处理 - callback_parser = ( - request.callback - if callable(request.callback) - else tools.get_method(parser, request.callback) - ) - results = callback_parser(request, response) - else: # 否则默认用parser处理 - results = parser.parse(request, response) - - if results and not isinstance(results, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, request.callback or "parse") - ) - - # 标识上一个result是什么 - result_type = 0 # 0\1\2 (初始值\request\item) - # 此处判断是request 还是 item - for result in results or []: - if isinstance(result, Request): - result_type = 1 - # 给request的 parser_name 赋值 - result.parser_name = result.parser_name or parser.name - - # 判断是同步的callback还是异步的 - if result.request_sync: # 同步 - request_dict = { - "request_obj": result, - "request_redis": None, - } - self.deal_request(request_dict) - else: # 异步 - # 将next_request 入库 - self._request_buffer.put_request(result) - del_request_redis_after_request_to_db = True - - elif isinstance(result, Item): - result_type = 2 - # 将item入库 - self._item_buffer.put_item(result) - # 需删除正在做的request - del_request_redis_after_item_to_db = True - - elif callable(result): # result为可执行的无参函数 - if result_type == 2: # item 的 callback,buffer里的item均入库后再执行 - self._item_buffer.put_item(result) - del_request_redis_after_item_to_db = True - - else: # result_type == 1: # request 的 callback,buffer里的request均入库后再执行。可能有的parser直接返回callback - self._request_buffer.put_request(result) - del_request_redis_after_request_to_db = True - - elif result is not None: - function_name = "{}.{}".format( - parser.name, - ( - request.callback - and callable(request.callback) - and getattr(request.callback, "__name__") - or request.callback - ) - or "parse", - ) - raise TypeError( - f"{function_name} result expect Request、Item or callback, bug get type: {type(result)}" - ) - - except Exception as e: - exception_type = ( - str(type(e)).replace("", "") - ) - if exception_type.startswith("requests"): - # 记录下载失败的文档 - self.record_download_status( - ParserControl.DOWNLOAD_EXCEPTION, parser.name - ) - if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0: - request.del_proxy() - - else: - # 记录解析程序异常 - self.record_download_status( - ParserControl.PAESERS_EXCEPTION, parser.name - ) - - if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多 - log.exception(e) - - log.error( - """ - -------------- %s.%s error ------------- - error %s - response %s - deal request %s - """ - % ( - parser.name, - ( - request.callback - and callable(request.callback) - and getattr(request.callback, "__name__") - or request.callback - ) - or "parse", - str(e), - response, - tools.dumps_json(request.to_dict, indent=28) - if setting.LOG_LEVEL == "DEBUG" - else request, - ) - ) + return parser + return None - request.error_msg = "%s: %s" % (exception_type, e) - request.response = str(response) - - if "Invalid URL" in str(e): - request.is_abandoned = True - - requests = parser.exception_request(request, response, e) or [ - request - ] - if not isinstance(requests, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, "exception_request") - ) - for request in requests: - if callable(request): - self._request_buffer.put_request(request) - continue - - if not isinstance(request, Request): - raise Exception("exception_request 需 yield request") - - if ( - request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES - or request.is_abandoned - ): - self.__class__._failed_task_count += 1 # 记录失败任务数 - - # 处理failed_request的返回值 request 或 func - results = parser.failed_request(request, response, e) or [ - request - ] - if not isinstance(results, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, "failed_request") - ) + def _prepare_response(self, parser, request): + used_download_midware_enable = False + if not request.auto_request: + return request, None, used_download_midware_enable - for result in results: - if isinstance(result, Request): - if setting.SAVE_FAILED_REQUEST: - if used_download_midware_enable: - # 去掉download_midware 添加的属性 - original_request = ( - Request.from_dict(eval(request_redis)) - if request_redis - else result - ) - original_request.error_msg = ( - request.error_msg - ) - original_request.response = request.response - - self._request_buffer.put_failed_request( - original_request - ) - else: - self._request_buffer.put_failed_request( - result - ) - - elif callable(result): - self._request_buffer.put_request(result) - - elif isinstance(result, Item): - self._item_buffer.put_item(result) - - del_request_redis_after_request_to_db = True - - else: - # 将 requests 重新入库 爬取 - request.retry_times += 1 - request.filter_repeat = False - log.info( - """ - 入库 等待重试 - url %s - 重试次数 %s - 最大允许重试次数 %s""" - % ( - request.url, - request.retry_times, - setting.SPIDER_MAX_RETRY_TIMES, - ) - ) - if used_download_midware_enable: - # 去掉download_midware 添加的属性 使用原来的requests - original_request = ( - Request.from_dict(eval(request_redis)) - if request_redis - else request - ) - if hasattr(request, "error_msg"): - original_request.error_msg = request.error_msg - if hasattr(request, "response"): - original_request.response = request.response - original_request.retry_times = request.retry_times - original_request.filter_repeat = request.filter_repeat - - self._request_buffer.put_request(original_request) - else: - self._request_buffer.put_request(request) - del_request_redis_after_request_to_db = True + request_temp = None + response = None - else: - # 记录下载成功的文档 - self.record_download_status( - ParserControl.DOWNLOAD_SUCCESS, parser.name + # 下载中间件 + if request.download_midware: + if isinstance(request.download_midware, (list, tuple)): + request_temp = request + for download_midware in request.download_midware: + download_midware = ( + download_midware + if callable(download_midware) + else tools.get_method(parser, download_midware) ) - # 记录成功任务数 - self.__class__._success_task_count += 1 - - # 缓存下载成功的文档 - if setting.RESPONSE_CACHED_ENABLE: - request.save_cached( - response=response, - expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME, - ) - - finally: - # 释放浏览器 - if response and getattr(response, "browser", None): - request.render_downloader.put_back(response.browser) - - break - - # 删除正在做的request 跟随item优先 - if request_redis: - if del_request_redis_after_item_to_db: - self._item_buffer.put_item(request_redis) - - elif del_request_redis_after_request_to_db: - self._request_buffer.put_del_request(request_redis) - + request_temp = download_midware(request_temp) else: - self._request_buffer.put_del_request(request_redis) - + download_midware = ( + request.download_midware + if callable(request.download_midware) + else tools.get_method(parser, request.download_midware) + ) + request_temp = download_midware(request) + elif request.download_midware != False: + request_temp = parser.download_midware(request) + + # 请求 + if request_temp: + if isinstance(request_temp, (tuple, list)) and len(request_temp) == 2: + request_temp, response = request_temp + + if not isinstance(request_temp, Request): + raise Exception( + "download_midware need return a request, but received type: {}".format( + type(request_temp) + ) + ) + used_download_midware_enable = True + request = request_temp + + return request, response, used_download_midware_enable + + def _download_response(self, request, response): + if response is None: + response = ( + request.get_response() + if not setting.RESPONSE_CACHED_USED + else request.get_response_from_cached(save_cached=False) + ) + + if response == None: + raise Exception("连接超时 url: %s" % request.url) + + return response + + def _run_callback(self, parser, request, response): + if request.callback: # 如果有parser的回调函数,则用回调处理 + callback_parser = ( + request.callback + if callable(request.callback) + else tools.get_method(parser, request.callback) + ) + return callback_parser(request, response) + + # 否则默认用parser处理 + return parser.parse(request, response) + + def _dispatch_results(self, parser, request, results): + dispatcher = ResultDispatcher( + request_buffer=self._request_buffer, + item_buffer=self._item_buffer, + deal_request=self.deal_request, + sync_request_factory=self._make_sync_request_payload, + allow_callable=True, + ) + return dispatcher.dispatch(parser, request, results) + + def _finish_request( + self, + request_redis, + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ): + # 删除正在做的request 跟随item优先 + if not request_redis: + return + + if del_request_redis_after_item_to_db: + self._item_buffer.put_item(request_redis) + elif del_request_redis_after_request_to_db: + self._request_buffer.put_del_request(request_redis) + else: + self._request_buffer.put_del_request(request_redis) + + def _record_success(self, parser, request, response): + # 记录下载成功的文档 + self.record_download_status(ParserControl.DOWNLOAD_SUCCESS, parser.name) + # 记录成功任务数 + self.__class__._success_task_count += 1 + + # 缓存下载成功的文档 + if setting.RESPONSE_CACHED_ENABLE: + request.save_cached( + response=response, + expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME, + ) + + def _release_response_browser(self, request, response): + # 释放浏览器 + if response and getattr(response, "browser", None): + request.render_downloader.put_back(response.browser) + + def _sleep_after_request(self): if setting.SPIDER_SLEEP_TIME: if ( isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list)) @@ -421,6 +222,224 @@ def deal_request(self, request): else: time.sleep(setting.SPIDER_SLEEP_TIME) + def _original_request_after_middleware(self, request_redis, request): + return Request.from_dict(eval(request_redis)) if request_redis else request + + def _handle_exception( + self, + parser, + request, + request_redis, + response, + exception, + used_download_midware_enable, + ): + del_request_redis_after_item_to_db = False + del_request_redis_after_request_to_db = False + + exception_type = str(type(exception)).replace("", "") + if exception_type.startswith("requests"): + # 记录下载失败的文档 + self.record_download_status(ParserControl.DOWNLOAD_EXCEPTION, parser.name) + if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0: + request.del_proxy() + + else: + # 记录解析程序异常 + self.record_download_status(ParserControl.PAESERS_EXCEPTION, parser.name) + + if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多 + log.exception(exception) + + log.error( + """ + -------------- %s.%s error ------------- + error %s + response %s + deal request %s + """ + % ( + parser.name, + ( + request.callback + and callable(request.callback) + and getattr(request.callback, "__name__") + or request.callback + ) + or "parse", + str(exception), + response, + tools.dumps_json(request.to_dict, indent=28) + if setting.LOG_LEVEL == "DEBUG" + else request, + ) + ) + + request.error_msg = "%s: %s" % (exception_type, exception) + request.response = str(response) + + if "Invalid URL" in str(exception): + request.is_abandoned = True + + requests = parser.exception_request(request, response, exception) or [request] + if not isinstance(requests, Iterable): + raise Exception("%s.%s返回值必须可迭代" % (parser.name, "exception_request")) + + for exception_request in requests: + if callable(exception_request): + self._request_buffer.put_request(exception_request) + continue + + if not isinstance(exception_request, Request): + raise Exception("exception_request 需 yield request") + + if ( + exception_request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES + or exception_request.is_abandoned + ): + self.__class__._failed_task_count += 1 # 记录失败任务数 + + # 处理failed_request的返回值 request 或 func + results = parser.failed_request( + exception_request, response, exception + ) or [exception_request] + if not isinstance(results, Iterable): + raise Exception( + "%s.%s返回值必须可迭代" % (parser.name, "failed_request") + ) + + for result in results: + if isinstance(result, Request): + if setting.SAVE_FAILED_REQUEST: + if used_download_midware_enable: + # 去掉download_midware 添加的属性 + original_request = self._original_request_after_middleware( + request_redis, result + ) + original_request.error_msg = exception_request.error_msg + original_request.response = exception_request.response + + self._request_buffer.put_failed_request( + original_request + ) + else: + self._request_buffer.put_failed_request(result) + + elif callable(result): + self._request_buffer.put_request(result) + + elif isinstance(result, Item): + self._item_buffer.put_item(result) + + del_request_redis_after_request_to_db = True + + else: + # 将 requests 重新入库 爬取 + exception_request.retry_times += 1 + exception_request.filter_repeat = False + log.info( + """ + 入库 等待重试 + url %s + 重试次数 %s + 最大允许重试次数 %s""" + % ( + exception_request.url, + exception_request.retry_times, + setting.SPIDER_MAX_RETRY_TIMES, + ) + ) + if used_download_midware_enable: + # 去掉download_midware 添加的属性 使用原来的requests + original_request = self._original_request_after_middleware( + request_redis, exception_request + ) + if hasattr(exception_request, "error_msg"): + original_request.error_msg = exception_request.error_msg + if hasattr(exception_request, "response"): + original_request.response = exception_request.response + original_request.retry_times = exception_request.retry_times + original_request.filter_repeat = exception_request.filter_repeat + + self._request_buffer.put_request(original_request) + else: + self._request_buffer.put_request(exception_request) + del_request_redis_after_request_to_db = True + + return ( + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ) + + def deal_request(self, request): + response = None + request_redis = request["request_redis"] + request = request["request_obj"] + + del_request_redis_after_item_to_db = False + del_request_redis_after_request_to_db = False + used_download_midware_enable = False + + parser = self._find_parser(request) + if not parser: + self._finish_request( + request_redis, + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ) + self._sleep_after_request() + return + + try: + self.__class__._total_task_count += 1 + # 记录需下载的文档 + self.record_download_status(ParserControl.DOWNLOAD_TOTAL, parser.name) + + # 解析request + request, response, used_download_midware_enable = self._prepare_response( + parser, request + ) + if request.auto_request: + response = self._download_response(request, response) + + # 校验 + if request.auto_request and parser.validate(request, response) == False: + return + + results = self._run_callback(parser, request, response) + dispatch_result = self._dispatch_results(parser, request, results) + del_request_redis_after_item_to_db = ( + dispatch_result.del_request_redis_after_item_to_db + ) + del_request_redis_after_request_to_db = ( + dispatch_result.del_request_redis_after_request_to_db + ) + + except Exception as e: + ( + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ) = self._handle_exception( + parser, + request, + request_redis, + response, + e, + used_download_midware_enable, + ) + + else: + self._record_success(parser, request, response) + + finally: + self._release_response_browser(request, response) + self._finish_request( + request_redis, + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ) + self._sleep_after_request() + def record_download_status(self, status, spider): """ 记录html等文档下载状态 @@ -431,6 +450,7 @@ def record_download_status(self, status, spider): def stop(self): self._thread_stop = True + self._state.request_stop() self._started.clear() def add_parser(self, parser: BaseParser): @@ -467,6 +487,7 @@ def __init__( item_buffer: ItemBuffer, ): super(ParserControl, self).__init__() + self._state = RuntimeState() self._parsers = [] self._memory_db = memory_db self._thread_stop = False @@ -474,7 +495,9 @@ def __init__( self._item_buffer = item_buffer def run(self): - while not self._thread_stop: + self._thread_stop = False + self._state = RuntimeState() + while not self._state.is_stop_requested: try: request = self._memory_db.get() if not request: @@ -484,264 +507,184 @@ def run(self): continue self.is_show_tip = False - self.deal_request(request) + with self._state.busy(): + self.deal_request(request) except Exception as e: log.exception(e) - def deal_request(self, request): - response = None + def _make_sync_request_payload(self, request): + return request - for parser in self._parsers: - if parser.name == request.parser_name: - try: - self.__class__._total_task_count += 1 - # 记录需下载的文档 - self.record_download_status( - ParserControl.DOWNLOAD_TOTAL, parser.name - ) + def _dispatch_results(self, parser, request, results): + dispatcher = ResultDispatcher( + request_buffer=self._request_buffer, + item_buffer=self._item_buffer, + deal_request=self.deal_request, + sync_request_factory=self._make_sync_request_payload, + allow_callable=False, + ) + return dispatcher.dispatch(parser, request, results) - # 解析request - if request.auto_request: - request_temp = None - response = None - - # 下载中间件 - if request.download_midware: - if isinstance(request.download_midware, (list, tuple)): - request_temp = request - for download_midware in request.download_midware: - download_midware = ( - download_midware - if callable(download_midware) - else tools.get_method(parser, download_midware) - ) - request_temp = download_midware(request_temp) - else: - download_midware = ( - request.download_midware - if callable(request.download_midware) - else tools.get_method( - parser, request.download_midware - ) - ) - request_temp = download_midware(request) - elif request.download_midware != False: - request_temp = parser.download_midware(request) - - # 请求 - if request_temp: - if ( - isinstance(request_temp, (tuple, list)) - and len(request_temp) == 2 - ): - request_temp, response = request_temp - - if not isinstance(request_temp, Request): - raise Exception( - "download_midware need return a request, but received type: {}".format( - type(request_temp) - ) - ) - request = request_temp - - if response is None: - response = ( - request.get_response() - if not setting.RESPONSE_CACHED_USED - else request.get_response_from_cached(save_cached=False) - ) - - # 校验 - if parser.validate(request, response) == False: - break - - else: - response = None - - if request.callback: # 如果有parser的回调函数,则用回调处理 - callback_parser = ( - request.callback - if callable(request.callback) - else tools.get_method(parser, request.callback) - ) - results = callback_parser(request, response) - else: # 否则默认用parser处理 - results = parser.parse(request, response) - - if results and not isinstance(results, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, request.callback or "parse") - ) - - # 此处判断是request 还是 item - for result in results or []: - if isinstance(result, Request): - # 给request的 parser_name 赋值 - result.parser_name = result.parser_name or parser.name - - # 判断是同步的callback还是异步的 - if result.request_sync: # 同步 - self.deal_request(result) - else: # 异步 - # 将next_request 入库 - self._request_buffer.put_request(result) - - elif isinstance(result, Item): - self._item_buffer.put_item(result) - elif result is not None: - function_name = "{}.{}".format( - parser.name, - ( - request.callback - and callable(request.callback) - and getattr(request.callback, "__name__") - or request.callback - ) - or "parse", - ) - raise TypeError( - f"{function_name} result expect Request or Item, bug get type: {type(result)}" - ) - - except Exception as e: - exception_type = ( - str(type(e)).replace("", "") - ) - if exception_type.startswith("requests"): - # 记录下载失败的文档 - self.record_download_status( - ParserControl.DOWNLOAD_EXCEPTION, parser.name - ) - if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0: - request.del_proxy() - - else: - # 记录解析程序异常 - self.record_download_status( - ParserControl.PAESERS_EXCEPTION, parser.name - ) - - if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多 - log.exception(e) - - log.error( - """ - -------------- %s.%s error ------------- - error %s - response %s - deal request %s - """ - % ( - parser.name, - ( - request.callback - and callable(request.callback) - and getattr(request.callback, "__name__") - or request.callback - ) - or "parse", - str(e), - response, - tools.dumps_json(request.to_dict, indent=28) - if setting.LOG_LEVEL == "DEBUG" - else request, - ) - ) - - request.error_msg = "%s: %s" % (exception_type, e) - request.response = str(response) - - if "Invalid URL" in str(e): - request.is_abandoned = True - - requests = parser.exception_request(request, response, e) or [ - request - ] - if not isinstance(requests, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, "exception_request") - ) - for request in requests: - if not isinstance(request, Request): - raise Exception("exception_request 需 yield request") - - if ( - request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES - or request.is_abandoned - ): - self.__class__._failed_task_count += 1 # 记录失败任务数 - - # 处理failed_request的返回值 request 或 func - results = parser.failed_request(request, response, e) or [ - request - ] - if not isinstance(results, Iterable): - raise Exception( - "%s.%s返回值必须可迭代" % (parser.name, "failed_request") - ) - - log.info( - """ - 任务超过最大重试次数,丢弃 - url %s - 重试次数 %s - 最大允许重试次数 %s""" - % ( - request.url, - request.retry_times, - setting.SPIDER_MAX_RETRY_TIMES, - ) - ) - - else: - # 将 requests 重新入库 爬取 - request.retry_times += 1 - request.filter_repeat = False - log.info( - """ - 入库 等待重试 - url %s - 重试次数 %s - 最大允许重试次数 %s""" - % ( - request.url, - request.retry_times, - setting.SPIDER_MAX_RETRY_TIMES, - ) - ) - self._request_buffer.put_request(request) + def _finish_request( + self, + request_redis, + del_request_redis_after_item_to_db, + del_request_redis_after_request_to_db, + ): + assert request_redis is None + assert del_request_redis_after_item_to_db is False + assert del_request_redis_after_request_to_db is False + return + + def _download_response(self, request, response): + if response is None: + response = ( + request.get_response() + if not setting.RESPONSE_CACHED_USED + else request.get_response_from_cached(save_cached=False) + ) + + return response + + def _handle_air_exception(self, parser, request, response, error): + exception_type = str(type(error)).replace("", "") + if exception_type.startswith("requests"): + # 记录下载失败的文档 + self.record_download_status(ParserControl.DOWNLOAD_EXCEPTION, parser.name) + if request.retry_times % setting.PROXY_MAX_FAILED_TIMES == 0: + request.del_proxy() + + else: + # 记录解析程序异常 + self.record_download_status(ParserControl.PAESERS_EXCEPTION, parser.name) + + if setting.LOG_LEVEL == "DEBUG": # 只有debug模式下打印, 超时的异常篇幅太多 + log.exception(error) + + log.error( + """ + -------------- %s.%s error ------------- + error %s + response %s + deal request %s + """ + % ( + parser.name, + ( + request.callback + and callable(request.callback) + and getattr(request.callback, "__name__") + or request.callback + ) + or "parse", + str(error), + response, + tools.dumps_json(request.to_dict, indent=28) + if setting.LOG_LEVEL == "DEBUG" + else request, + ) + ) - else: - # 记录下载成功的文档 - self.record_download_status( - ParserControl.DOWNLOAD_SUCCESS, parser.name - ) - # 记录成功任务数 - self.__class__._success_task_count += 1 + request.error_msg = "%s: %s" % (exception_type, error) + request.response = str(response) - # 缓存下载成功的文档 - if setting.RESPONSE_CACHED_ENABLE: - request.save_cached( - response=response, - expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME, - ) + if "Invalid URL" in str(error): + request.is_abandoned = True - finally: - # 释放浏览器 - if response and getattr(response, "browser", None): - request.render_downloader.put_back(response.browser) + requests = parser.exception_request(request, response, error) or [request] + if not isinstance(requests, Iterable): + raise Exception("%s.%s返回值必须可迭代" % (parser.name, "exception_request")) - break + for retry_request in requests: + if not isinstance(retry_request, Request): + raise Exception("exception_request 需 yield request") - if setting.SPIDER_SLEEP_TIME: if ( - isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list)) - and len(setting.SPIDER_SLEEP_TIME) == 2 + retry_request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES + or retry_request.is_abandoned ): - sleep_time = random.randint( - int(setting.SPIDER_SLEEP_TIME[0]), int(setting.SPIDER_SLEEP_TIME[1]) + self.__class__._failed_task_count += 1 # 记录失败任务数 + + # 处理failed_request的返回值 request + results = parser.failed_request(retry_request, response, error) or [ + retry_request + ] + if not isinstance(results, Iterable): + raise Exception( + "%s.%s返回值必须可迭代" % (parser.name, "failed_request") + ) + + log.info( + """ + 任务超过最大重试次数,丢弃 + url %s + 重试次数 %s + 最大允许重试次数 %s""" + % ( + retry_request.url, + retry_request.retry_times, + setting.SPIDER_MAX_RETRY_TIMES, + ) ) - time.sleep(sleep_time) + else: - time.sleep(setting.SPIDER_SLEEP_TIME) + # 将 requests 重新入库 爬取 + retry_request.retry_times += 1 + retry_request.filter_repeat = False + log.info( + """ + 入库 等待重试 + url %s + 重试次数 %s + 最大允许重试次数 %s""" + % ( + retry_request.url, + retry_request.retry_times, + setting.SPIDER_MAX_RETRY_TIMES, + ) + ) + self._request_buffer.put_request(retry_request) + + def deal_request(self, request): + response = None + + parser = self._find_parser(request) + if not parser: + self._sleep_after_request() + return + + try: + self.__class__._total_task_count += 1 + # 记录需下载的文档 + self.record_download_status(ParserControl.DOWNLOAD_TOTAL, parser.name) + + # 解析request + request, response, used_download_midware_enable = self._prepare_response( + parser, request + ) + if request.auto_request: + response = self._download_response(request, response) + + # 校验 + if request.auto_request and parser.validate(request, response) == False: + return + + results = self._run_callback(parser, request, response) + self._dispatch_results(parser, request, results) + + except Exception as e: + self._handle_air_exception( + parser=parser, + request=request, + response=response, + error=e, + ) + + else: + self._record_success(parser, request, response) + + finally: + self._release_response_browser(request, response) + self._finish_request(None, False, False) + self._sleep_after_request() diff --git a/feapder/core/result_dispatcher.py b/feapder/core/result_dispatcher.py new file mode 100644 index 00000000..43186cd7 --- /dev/null +++ b/feapder/core/result_dispatcher.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +""" +Parser result routing for runtime parser controls. +""" +from collections.abc import Iterable +from dataclasses import dataclass + +from feapder.network.item import Item +from feapder.network.request import Request + + +@dataclass +class DispatchResult: + del_request_redis_after_item_to_db: bool = False + del_request_redis_after_request_to_db: bool = False + + +class ResultDispatcher: + REQUEST_RESULT = 1 + ITEM_RESULT = 2 + + def __init__( + self, + *, + request_buffer, + item_buffer, + deal_request, + sync_request_factory=None, + allow_callable=True, + ): + self._request_buffer = request_buffer + self._item_buffer = item_buffer + self._deal_request = deal_request + self._sync_request_factory = sync_request_factory or (lambda request: request) + self._allow_callable = allow_callable + + def dispatch(self, parser, request, results): + if results and not isinstance(results, Iterable): + raise Exception( + "%s.%s返回值必须可迭代" % (parser.name, request.callback or "parse") + ) + + dispatch_result = DispatchResult() + result_type = 0 + + for result in results or []: + if isinstance(result, Request): + result_type = self.REQUEST_RESULT + result.parser_name = result.parser_name or parser.name + if result.request_sync: + self._deal_request(self._sync_request_factory(result)) + else: + self._request_buffer.put_request(result) + dispatch_result.del_request_redis_after_request_to_db = True + + elif isinstance(result, Item): + result_type = self.ITEM_RESULT + self._item_buffer.put_item(result) + dispatch_result.del_request_redis_after_item_to_db = True + + elif callable(result) and self._allow_callable: + if result_type == self.ITEM_RESULT: + self._item_buffer.put_item(result) + dispatch_result.del_request_redis_after_item_to_db = True + else: + self._request_buffer.put_request(result) + dispatch_result.del_request_redis_after_request_to_db = True + + elif result is not None: + raise TypeError(self._format_type_error(parser, request, result)) + + return dispatch_result + + def _format_type_error(self, parser, request, result): + callback_name = ( + request.callback + and callable(request.callback) + and getattr(request.callback, "__name__") + or request.callback + ) or "parse" + expected = "Request、Item or callback" if self._allow_callable else "Request or Item" + return ( + f"{parser.name}.{callback_name} result expect {expected}, " + f"bug get type: {type(result)}" + ) diff --git a/feapder/core/runtime_state.py b/feapder/core/runtime_state.py new file mode 100644 index 00000000..a8cac210 --- /dev/null +++ b/feapder/core/runtime_state.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +""" +Small runtime state helper for background runtime threads. +""" +from contextlib import contextmanager +from threading import RLock + + +class RuntimeState: + def __init__(self): + self._stop_requested = False + self._busy_count = 0 + self._lock = RLock() + + def request_stop(self): + with self._lock: + self._stop_requested = True + + @property + def is_stop_requested(self): + with self._lock: + return self._stop_requested + + def mark_busy(self): + with self._lock: + self._busy_count += 1 + + def mark_idle(self): + with self._lock: + if self._busy_count > 0: + self._busy_count -= 1 + + @property + def busy_count(self): + with self._lock: + return self._busy_count + + @property + def is_idle(self): + return self.busy_count == 0 + + @contextmanager + def busy(self): + self.mark_busy() + try: + yield + finally: + self.mark_idle() diff --git a/feapder/core/scheduler.py b/feapder/core/scheduler.py index 0177d185..7fb1fcc0 100644 --- a/feapder/core/scheduler.py +++ b/feapder/core/scheduler.py @@ -286,32 +286,19 @@ def _start(self): self.__add_task() def all_thread_is_done(self): - # 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性 + # Check three times to avoid transient idle states between runtime stages. for i in range(3): - # 检测 collector 状态 - if ( - self._collector.is_collector_task() - or self._collector.get_requests_count() > 0 - ): + if not self._collector.is_idle(): return False - # 检测 parser_control 状态 for parser_control in self._parser_controls: - if not parser_control.is_not_task(): + if not parser_control.is_idle(): return False - # 检测 item_buffer 状态 - if ( - self._item_buffer.get_items_count() > 0 - or self._item_buffer.is_adding_to_db() - ): + if not self._item_buffer.is_idle(): return False - # 检测 request_buffer 状态 - if ( - self._request_buffer.get_requests_count() > 0 - or self._request_buffer.is_adding_to_db() - ): + if not self._request_buffer.is_idle(): return False tools.delay_time(1) diff --git a/feapder/core/spiders/air_spider.py b/feapder/core/spiders/air_spider.py index 70c30112..f51f8930 100644 --- a/feapder/core/spiders/air_spider.py +++ b/feapder/core/spiders/air_spider.py @@ -57,21 +57,16 @@ def distribute_task(self): self._request_buffer.put_request(request, ignore_max_size=False) def all_thread_is_done(self): - for i in range(3): # 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性 - # 检测 parser_control 状态 + # Check three times to avoid transient idle states between runtime stages. + for i in range(3): for parser_control in self._parser_controls: - if not parser_control.is_not_task(): + if not parser_control.is_idle(): return False - # 检测 任务队列 状态 if not self._memory_db.empty(): return False - # 检测 item_buffer 状态 - if ( - self._item_buffer.get_items_count() > 0 - or self._item_buffer.is_adding_to_db() - ): + if not self._item_buffer.is_idle(): return False tools.delay_time(1) diff --git a/feapder/db/redisdb.py b/feapder/db/redisdb.py index d882e687..ee4e0cd9 100644 --- a/feapder/db/redisdb.py +++ b/feapder/db/redisdb.py @@ -6,11 +6,11 @@ --------- @author: Boris """ -import os import time from typing import Union, List import redis +from redis.cluster import ClusterNode, RedisCluster from redis.connection import Encoder as _Encoder from redis.exceptions import ConnectionError, TimeoutError from redis.exceptions import DataError @@ -140,29 +140,24 @@ def get_connect(self): startup_nodes = [] for ip_port in ip_ports: ip, port = ip_port.split(":") - startup_nodes.append({"host": ip, "port": port}) + startup_nodes.append(ClusterNode(ip, int(port))) if self._service_name: # log.debug("使用redis哨兵模式") - hosts = [(node["host"], node["port"]) for node in startup_nodes] + hosts = [(node.host, node.port) for node in startup_nodes] sentinel = Sentinel(hosts, socket_timeout=3, **self._kwargs) self._redis = sentinel.master_for( self._service_name, password=self._user_pass, db=self._db, - redis_class=redis.StrictRedis, + redis_class=redis.Redis, decode_responses=self._decode_responses, max_connections=self._max_connections, **self._kwargs, ) + self._is_redis_cluster = False else: - try: - from rediscluster import RedisCluster - except ModuleNotFoundError as e: - log.error('请安装 pip install "feapder[all]"') - os._exit(0) - # log.debug("使用redis集群模式") self._redis = RedisCluster( startup_nodes=startup_nodes, @@ -171,11 +166,10 @@ def get_connect(self): max_connections=self._max_connections, **self._kwargs, ) - - self._is_redis_cluster = True + self._is_redis_cluster = True else: ip, port = ip_ports[0].split(":") - self._redis = redis.StrictRedis( + self._redis = redis.Redis( host=ip, port=port, db=self._db, @@ -186,7 +180,7 @@ def get_connect(self): ) self._is_redis_cluster = False else: - self._redis = redis.StrictRedis.from_url( + self._redis = redis.Redis.from_url( self._url, decode_responses=self._decode_responses, **self._kwargs ) self._is_redis_cluster = False @@ -573,7 +567,8 @@ def zexists(self, table, values): if isinstance(values, list): pipe = self._redis.pipeline() - pipe.multi() + if not self._is_redis_cluster: + pipe.multi() for value in values: pipe.zscore(table, value) is_exists_temp = pipe.execute() @@ -773,7 +768,8 @@ def setbit( else: assert len(offsets) == len(values), "offsets值要与values值一一对应" pipe = self._redis.pipeline() - pipe.multi() + if not self._is_redis_cluster: + pipe.multi() for offset, value in zip(offsets, values): pipe.setbit(table, offset, value) @@ -792,7 +788,8 @@ def getbit(self, table, offsets): """ if isinstance(offsets, list): pipe = self._redis.pipeline() - pipe.multi() + if not self._is_redis_cluster: + pipe.multi() for offset in offsets: pipe.getbit(table, offset) diff --git a/feapder/network/proxy_pool_old.py b/feapder/network/proxy_pool_old.py index 2e3bb6c1..fddaabc1 100644 --- a/feapder/network/proxy_pool_old.py +++ b/feapder/network/proxy_pool_old.py @@ -147,7 +147,7 @@ def get_proxy_from_redis(proxy_source_url, **kwargs): @return: [{'http':'http://xxx.xxx.xxx:xxx', 'https':'http://xxx.xxx.xxx.xxx:xxx'}] """ - redis_conn = redis.StrictRedis.from_url(proxy_source_url) + redis_conn = redis.Redis.from_url(proxy_source_url) key = kwargs.get("redis_proxies_key") assert key, "从redis中获取代理 需要指定 redis_proxies_key" proxies = redis_conn.zrange(key, 0, -1) diff --git a/feapder/requirements.txt b/feapder/requirements.txt index 21717674..888d27f2 100644 --- a/feapder/requirements.txt +++ b/feapder/requirements.txt @@ -1,21 +1,20 @@ better-exceptions>=0.2.2 -DBUtils>=2.0 -parsel>=1.5.2 +DBUtils>=3.0 +parsel>=1.8.1 PyExecJS>=1.5.1 -pymongo>=3.10.1 -PyMySQL>=0.9.3 -redis>=2.10.6,<4.0.0 -requests>=2.22.0 -selenium>=3.141.0 -bs4>=0.0.1 -ipython>=7.14.0 -bitarray>=1.5.3 -redis-py-cluster>=2.1.0 -cryptography>=3.3.2 -urllib3>=1.25.8 +pymongo>=4.0.0 +PyMySQL>=1.1.0 +redis>=5.0.0,<9.0.0 +requests>=2.31.0 +selenium>=4.10.0 +beautifulsoup4>=4.12.0 +ipython>=8.0.0 +bitarray>=2.8.0 +cryptography>=41.0.0 +urllib3>=2.0.0,<3.0.0 loguru>=0.5.3 influxdb>=5.3.1 pyperclip>=1.8.2 webdriver-manager>=4.0.0 terminal-layout>=2.1.3 -playwright \ No newline at end of file +playwright>=1.40.0 diff --git a/feapder/utils/webdriver/selenium_driver.py b/feapder/utils/webdriver/selenium_driver.py index 9f46d54b..06eb5493 100644 --- a/feapder/utils/webdriver/selenium_driver.py +++ b/feapder/utils/webdriver/selenium_driver.py @@ -8,13 +8,13 @@ @email: boris_liu@foxmail.com """ +import inspect import json import logging import os from typing import Optional, Union, List from selenium import webdriver -from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.firefox import GeckoDriverManager @@ -33,41 +33,7 @@ class SeleniumDriver(WebDriver, RemoteWebDriver): PHANTOMJS = "PHANTOMJS" FIREFOX = "FIREFOX" - __CHROME_ATTRS__ = { - "executable_path", - "port", - "options", - "service_args", - "desired_capabilities", - "service_log_path", - "chrome_options", - "keep_alive", - } - - __EDGE_ATTRS__ = __CHROME_ATTRS__ - - __FIREFOX_ATTRS__ = { - "firefox_profile", - "firefox_binary", - "timeout", - "capabilities", - "proxy", - "executable_path", - "options", - "service_log_path", - "firefox_options", - "service_args", - "desired_capabilities", - "log_path", - "keep_alive", - } - __PHANTOMJS_ATTRS__ = { - "executable_path", - "port", - "desired_capabilities", - "service_args", - "service_log_path", - } + __DRIVER_ATTRS__ = {"keep_alive"} def __init__(self, xhr_url_regexes: list = None, **kwargs): """ @@ -131,41 +97,117 @@ def filter_kwargs(self, kwargs: dict, driver_attrs: set): return data + def get_options(self, default_options, *option_keys): + for option_key in option_keys: + options = self._kwargs.get(option_key) + if options is not None: + return options + + return default_options + + def get_driver_kwargs(self): + return self.filter_kwargs(self._kwargs, self.__DRIVER_ATTRS__) + + def apply_capabilities(self, options, *capability_keys): + for capability_key in capability_keys: + capabilities = self._kwargs.get(capability_key) + if not capabilities: + continue + + for key, value in capabilities.items(): + options.set_capability(key, value) + + return options + + def build_service(self, service_cls, driver_manager_cls=None): + service = self._kwargs.get("service") + if service is not None: + return service + + service_kwargs = {} + service_args = self._kwargs.get("service_args") + if service_args is not None: + service_kwargs["service_args"] = service_args + + port = self._kwargs.get("port") + if port is not None: + service_kwargs["port"] = port + + log_path = self._kwargs.get("service_log_path") + if log_path is None: + log_path = self._kwargs.get("log_path") + if log_path is not None: + log_param = ( + "log_output" + if "log_output" in inspect.signature(service_cls).parameters + else "log_path" + ) + service_kwargs[log_param] = log_path + + if self._executable_path: + return service_cls(self._executable_path, **service_kwargs) + + if self._auto_install_driver and driver_manager_cls is not None: + return service_cls(driver_manager_cls().install(), **service_kwargs) + + if service_kwargs: + return service_cls(**service_kwargs) + + return None + + def create_driver(self, driver_cls, options, service): + kwargs = self.get_driver_kwargs() + if service is not None: + kwargs["service"] = service + + return driver_cls(options=options, **kwargs) + + def get_proxy(self): + return self._proxy() if callable(self._proxy) else self._proxy + + def get_user_agent(self): + return self._user_agent() if callable(self._user_agent) else self._user_agent + def get_driver(self): return self.driver def firefox_driver(self): - if webdriver.__version__ >= "4.0.0": - raise Exception( - f"暂未适配selenium=={webdriver.__version__}版本的firefox API,建议安装selenium==3.141.0版本或使用CHROME浏览器" - ) + from selenium.webdriver.firefox.service import Service - firefox_profile = webdriver.FirefoxProfile() - firefox_options = webdriver.FirefoxOptions() - firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX - try: - from selenium.webdriver.firefox.service import Service - except (ImportError, ModuleNotFoundError): - Service = None + firefox_options = self.get_options( + webdriver.FirefoxOptions(), "options", "firefox_options" + ) + firefox_profile = self._kwargs.get("firefox_profile") + if firefox_profile is not None: + firefox_options.profile = firefox_profile + firefox_binary = self._kwargs.get("firefox_binary") + if firefox_binary is not None: + firefox_options.binary_location = ( + getattr(firefox_binary, "path", None) + or getattr(firefox_binary, "_start_cmd", None) + or firefox_binary + ) + self.apply_capabilities(firefox_options, "desired_capabilities", "capabilities") if self._proxy: - proxy = self._proxy() if callable(self._proxy) else self._proxy - firefox_capabilities["marionette"] = True - firefox_capabilities["proxy"] = { - "proxyType": "MANUAL", - "httpProxy": proxy, - "ftpProxy": proxy, - "sslProxy": proxy, - } + proxy = self.get_proxy() + firefox_options.set_capability( + "proxy", + { + "proxyType": "MANUAL", + "httpProxy": proxy, + "ftpProxy": proxy, + "sslProxy": proxy, + }, + ) if self._user_agent: - firefox_profile.set_preference( - "general.useragent.override", - self._user_agent() if callable(self._user_agent) else self._user_agent, + firefox_options.set_preference( + "general.useragent.override", self.get_user_agent() ) if not self._load_images: - firefox_profile.set_preference("permissions.default.image", 2) + firefox_options.set_preference("permissions.default.image", 2) if self._headless: firefox_options.add_argument("--headless") @@ -176,25 +218,8 @@ def firefox_driver(self): for arg in self._custom_argument: firefox_options.add_argument(arg) - kwargs = self.filter_kwargs(self._kwargs, self.__FIREFOX_ATTRS__) - - if Service is None: - if self._executable_path: - kwargs.update(executable_path=self._executable_path) - elif self._auto_install_driver: - kwargs.update(executable_path=GeckoDriverManager().install()) - else: - if self._executable_path: - kwargs.update(service=Service(self._executable_path)) - elif self._auto_install_driver: - kwargs.update(service=Service(GeckoDriverManager().install())) - - driver = webdriver.Firefox( - capabilities=firefox_capabilities, - options=firefox_options, - firefox_profile=firefox_profile, - **kwargs, - ) + service = self.build_service(Service, GeckoDriverManager) + driver = self.create_driver(webdriver.Firefox, firefox_options, service) if self._window_size: driver.set_window_size(*self._window_size) @@ -202,31 +227,22 @@ def firefox_driver(self): return driver def chrome_driver(self): - chrome_options = webdriver.ChromeOptions() + chrome_options = self.get_options( + webdriver.ChromeOptions(), "options", "chrome_options" + ) # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option("useAutomationExtension", False) # docker 里运行需要 chrome_options.add_argument("--no-sandbox") - try: - from selenium.webdriver.chrome.service import Service - except (ImportError, ModuleNotFoundError): - Service = None + from selenium.webdriver.chrome.service import Service + + self.apply_capabilities(chrome_options, "desired_capabilities") if self._proxy: - chrome_options.add_argument( - "--proxy-server={}".format( - self._proxy() if callable(self._proxy) else self._proxy - ) - ) + chrome_options.add_argument("--proxy-server={}".format(self.get_proxy())) if self._user_agent: - chrome_options.add_argument( - "user-agent={}".format( - self._user_agent() - if callable(self._user_agent) - else self._user_agent - ) - ) + chrome_options.add_argument("user-agent={}".format(self.get_user_agent())) if not self._load_images: chrome_options.add_experimental_option( "prefs", {"profile.managed_default_content_settings.images": 2} @@ -254,19 +270,8 @@ def chrome_driver(self): for arg in self._custom_argument: chrome_options.add_argument(arg) - kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__) - if Service is None: - if self._executable_path: - kwargs.update(executable_path=self._executable_path) - elif self._auto_install_driver: - kwargs.update(executable_path=ChromeDriverManager().install()) - else: - if self._executable_path: - kwargs.update(service=Service(self._executable_path)) - elif self._auto_install_driver: - kwargs.update(service=Service(ChromeDriverManager().install())) - - driver = webdriver.Chrome(options=chrome_options, **kwargs) + service = self.build_service(Service, ChromeDriverManager) + driver = self.create_driver(webdriver.Chrome, chrome_options, service) # 隐藏浏览器特征 if self._use_stealth_js: @@ -293,44 +298,30 @@ def chrome_driver(self): ) if self._download_path: - driver.command_executor._commands["send_command"] = ( - "POST", - "/session/$sessionId/chromium/send_command", + driver.execute_cdp_cmd( + "Page.setDownloadBehavior", + {"behavior": "allow", "downloadPath": self._download_path}, ) - params = { - "cmd": "Page.setDownloadBehavior", - "params": {"behavior": "allow", "downloadPath": self._download_path}, - } - driver.execute("send_command", params) return driver def edge_driver(self): - edge_options = webdriver.EdgeOptions() + edge_options = self.get_options( + webdriver.EdgeOptions(), "options", "edge_options" + ) # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium edge_options.add_experimental_option("excludeSwitches", ["enable-automation"]) edge_options.add_experimental_option("useAutomationExtension", False) # docker 里运行需要 edge_options.add_argument("--no-sandbox") - try: - from selenium.webdriver.edge.service import Service - except (ImportError, ModuleNotFoundError): - Service = None + from selenium.webdriver.edge.service import Service + + self.apply_capabilities(edge_options, "desired_capabilities") if self._proxy: - edge_options.add_argument( - "--proxy-server={}".format( - self._proxy() if callable(self._proxy) else self._proxy - ) - ) + edge_options.add_argument("--proxy-server={}".format(self.get_proxy())) if self._user_agent: - edge_options.add_argument( - "user-agent={}".format( - self._user_agent() - if callable(self._user_agent) - else self._user_agent - ) - ) + edge_options.add_argument("user-agent={}".format(self.get_user_agent())) if not self._load_images: edge_options.add_experimental_option( "prefs", {"profile.managed_default_content_settings.images": 2} @@ -358,19 +349,8 @@ def edge_driver(self): for arg in self._custom_argument: edge_options.add_argument(arg) - kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__) - if Service is None: - if self._executable_path: - kwargs.update(executable_path=self._executable_path) - elif self._auto_install_driver: - raise NotImplementedError("edge not support auto install driver") - else: - if self._executable_path: - kwargs.update(service=Service(self._executable_path)) - elif self._auto_install_driver: - raise NotImplementedError("edge not support auto install driver") - - driver = webdriver.Edge(options=edge_options, **kwargs) + service = self.build_service(Service) + driver = self.create_driver(webdriver.Edge, edge_options, service) # 隐藏浏览器特征 if self._use_stealth_js: @@ -397,58 +377,19 @@ def edge_driver(self): ) if self._download_path: - driver.command_executor._commands["send_command"] = ( - "POST", - "/session/$sessionId/chromium/send_command", + driver.execute_cdp_cmd( + "Page.setDownloadBehavior", + {"behavior": "allow", "downloadPath": self._download_path}, ) - params = { - "cmd": "Page.setDownloadBehavior", - "params": {"behavior": "allow", "downloadPath": self._download_path}, - } - driver.execute("send_command", params) return driver def phantomjs_driver(self): - import warnings - - warnings.filterwarnings("ignore") - - service_args = [] - dcap = DesiredCapabilities.PHANTOMJS - - if self._proxy: - service_args.append( - "--proxy=%s" % self._proxy() if callable(self._proxy) else self._proxy - ) - if self._user_agent: - dcap["phantomjs.page.settings.userAgent"] = ( - self._user_agent() if callable(self._user_agent) else self._user_agent - ) - if not self._load_images: - service_args.append("--load-images=no") - - # 添加自定义的配置参数 - if self._custom_argument: - for arg in self._custom_argument: - service_args.append(arg) - - kwargs = self.filter_kwargs(self._kwargs, self.__PHANTOMJS_ATTRS__) - - if self._executable_path: - kwargs.update(executable_path=self._executable_path) - - driver = webdriver.PhantomJS( - service_args=service_args, desired_capabilities=dcap, **kwargs + raise NotImplementedError( + "PhantomJS is not supported by Selenium 4. " + "Please use CHROME, EDGE, or FIREFOX." ) - if self._window_size: - driver.set_window_size(self._window_size[0], self._window_size[1]) - - del warnings - - return driver - @property def domain(self): return tools.get_domain(self.url or self.driver.current_url) diff --git a/setup.py b/setup.py index cf4fe542..14542b97 100644 --- a/setup.py +++ b/setup.py @@ -13,8 +13,8 @@ import setuptools -if version_info < (3, 6, 0): - raise SystemExit("Sorry! feapder requires python 3.6.0 or later.") +if version_info < (3, 9, 0): + raise SystemExit("Sorry! feapder requires python 3.9.0 or later.") with open(join(dirname(__file__), "feapder/VERSION"), "rb") as fh: version = fh.read().decode("ascii").strip() @@ -35,15 +35,15 @@ requires = [ "better-exceptions>=0.2.2", - "DBUtils>=2.0", - "parsel>=1.5.2", - "PyMySQL>=0.9.3", - "redis>=2.10.6,<4.0.0", - "requests>=2.22.0", - "bs4>=0.0.1", - "ipython>=7.14.0", - "cryptography>=3.3.2", - "urllib3>=1.25.8", + "DBUtils>=3.0", + "parsel>=1.8.1", + "PyMySQL>=1.1.0", + "redis>=5.0.0,<9.0.0", + "requests>=2.31.0", + "beautifulsoup4>=4.12.0", + "ipython>=8.0.0", + "cryptography>=41.0.0", + "urllib3>=2.0.0,<3.0.0", "loguru>=0.5.3", "influxdb>=5.3.1", "pyperclip>=1.8.2", @@ -52,15 +52,14 @@ render_requires = [ "webdriver-manager>=4.0.0", - "playwright", - "selenium>=3.141.0", + "playwright>=1.40.0", + "selenium>=4.10.0", ] all_requires = [ - "bitarray>=1.5.3", + "bitarray>=2.8.0", "PyExecJS>=1.5.1", - "pymongo>=3.10.1", - "redis-py-cluster>=2.1.0", + "pymongo>=4.0.0", ] + render_requires setuptools.setup( @@ -69,7 +68,7 @@ author="Boris", license="MIT", author_email="feapder@qq.com", - python_requires=">=3.6", + python_requires=">=3.9", description="feapder是一款支持分布式、批次采集、数据防丢、报警丰富的python爬虫框架", long_description=long_description, long_description_content_type="text/markdown", diff --git a/tests/test_dependency_modernization.py b/tests/test_dependency_modernization.py new file mode 100644 index 00000000..e9a73b2a --- /dev/null +++ b/tests/test_dependency_modernization.py @@ -0,0 +1,115 @@ +import inspect + +from feapder.db.redisdb import RedisDB +from feapder.utils.webdriver.selenium_driver import SeleniumDriver + + +class OldStyleService: + def __init__( + self, executable_path=None, port=0, service_args=None, log_path=None, **kwargs + ): + self.executable_path = executable_path + self.port = port + self.service_args = service_args + self.log_path = log_path + + +class NewStyleService: + def __init__( + self, executable_path=None, port=0, service_args=None, log_output=None, **kwargs + ): + self.executable_path = executable_path + self.port = port + self.service_args = service_args + self.log_output = log_output + + +class FakeBrowser: + def set_window_size(self, *args): + self.window_size = args + + +def make_selenium_driver(**kwargs): + driver = object.__new__(SeleniumDriver) + driver._kwargs = kwargs + driver._executable_path = kwargs.pop("executable_path", "/tmp/driver") + driver._auto_install_driver = False + driver._proxy = None + driver._user_agent = None + driver._load_images = True + driver._headless = False + driver._custom_argument = None + driver._window_size = None + return driver + + +def test_selenium_service_log_path_supports_old_and_new_service_api(): + driver = make_selenium_driver( + service_log_path="/tmp/webdriver.log", service_args=["--verbose"], port=1234 + ) + + old_service = driver.build_service(OldStyleService) + new_service = driver.build_service(NewStyleService) + + assert old_service.executable_path == "/tmp/driver" + assert old_service.service_args == ["--verbose"] + assert old_service.port == 1234 + assert old_service.log_path == "/tmp/webdriver.log" + assert new_service.executable_path == "/tmp/driver" + assert new_service.service_args == ["--verbose"] + assert new_service.port == 1234 + assert new_service.log_output == "/tmp/webdriver.log" + + +def test_selenium_firefox_binary_maps_to_options_binary_location(): + driver = make_selenium_driver(firefox_binary="/tmp/firefox") + captured = {} + + def create_driver(driver_cls, options, service): + captured["options"] = options + captured["service"] = service + return FakeBrowser() + + driver.create_driver = create_driver + driver.build_service = lambda *args, **kwargs: None + + assert driver.firefox_driver() is not None + assert captured["options"].binary_location == "/tmp/firefox" + assert captured["service"] is None + + +def test_selenium_driver_kwargs_keep_public_constructor_args_internal(): + driver = make_selenium_driver( + keep_alive=False, + executable_path="/tmp/driver", + desired_capabilities={"acceptInsecureCerts": True}, + service_args=["--verbose"], + ) + + assert driver.get_driver_kwargs() == {"keep_alive": False} + + +def test_redisdb_public_api_signatures_are_preserved(): + expected = { + "__init__": [ + "self", + "ip_ports", + "db", + "user_pass", + "url", + "decode_responses", + "service_name", + "max_connections", + "kwargs", + ], + "from_url": ["url"], + "zadd": ["self", "table", "values", "prioritys"], + "zget": ["self", "table", "count", "is_pop"], + "zexists": ["self", "table", "values"], + "setbit": ["self", "table", "offsets", "values"], + "getbit": ["self", "table", "offsets"], + } + + for method_name, parameters in expected.items(): + signature = inspect.signature(getattr(RedisDB, method_name)) + assert list(signature.parameters) == parameters diff --git a/tests/test_parser_control_runtime.py b/tests/test_parser_control_runtime.py new file mode 100644 index 00000000..bf9b76f4 --- /dev/null +++ b/tests/test_parser_control_runtime.py @@ -0,0 +1,306 @@ +import pytest + +import feapder.setting as setting +from feapder.core.parser_control import AirSpiderParserControl, ParserControl +from feapder.network.request import Request +from feapder.network.item import Item + + +class FakeResponse: + browser = None + + def __str__(self): + return "" + + +class FakeParser: + name = "FakeParser" + + def __init__(self): + self.validated = [] + self.parsed = [] + + def download_midware(self, request): + return None + + def validate(self, request, response): + self.validated.append((request, response)) + return True + + def parse(self, request, response): + self.parsed.append((request, response)) + return [Item(title="ok")] + + def exception_request(self, request, response, e): + return [request] + + def failed_request(self, request, response, e): + return [request] + + +class FakeParserRecordingExceptions(FakeParser): + def __init__(self): + super().__init__() + self.exception_requests = [] + + def exception_request(self, request, response, e): + self.exception_requests.append((request, response, e)) + return [request] + + +class FakeRequestBuffer: + def __init__(self): + self.requests = [] + self.deleted = [] + self.failed = [] + + def put_request(self, request): + self.requests.append(request) + + def put_del_request(self, request): + self.deleted.append(request) + + def put_failed_request(self, request): + self.failed.append(request) + + +class FakeItemBuffer: + def __init__(self): + self.items = [] + + def put_item(self, item): + self.items.append(item) + + +def return_none_response(): + return None + + +def build_control(): + control = object.__new__(ParserControl) + control._parsers = [] + control._request_buffer = FakeRequestBuffer() + control._item_buffer = FakeItemBuffer() + return control + + +def build_air_control(): + control = object.__new__(AirSpiderParserControl) + control._parsers = [] + control._request_buffer = FakeRequestBuffer() + control._item_buffer = FakeItemBuffer() + return control + + +def test_find_parser_by_request_parser_name(): + parser = FakeParser() + control = build_control() + control._parsers = [parser] + + assert control._find_parser(Request(parser_name="FakeParser")) is parser + + +def test_find_parser_returns_none_for_missing_parser(): + control = build_control() + control._parsers = [FakeParser()] + + assert control._find_parser(Request(parser_name="OtherParser")) is None + + +def test_run_callback_uses_named_callback(): + parser = FakeParser() + parser.custom = lambda request, response: [Item(title="custom")] + control = build_control() + request = Request(callback="custom") + response = FakeResponse() + + results = control._run_callback(parser, request, response) + + assert len(results) == 1 + assert results[0]["title"] == "custom" + + +def test_finish_request_prefers_item_buffer_for_item_results(): + control = build_control() + + control._finish_request( + request_redis="raw-request", + del_request_redis_after_item_to_db=True, + del_request_redis_after_request_to_db=True, + ) + + assert control._item_buffer.items == ["raw-request"] + assert control._request_buffer.deleted == [] + + +def test_finish_request_deletes_via_request_buffer_for_request_results(): + control = build_control() + + control._finish_request( + request_redis="raw-request", + del_request_redis_after_item_to_db=False, + del_request_redis_after_request_to_db=True, + ) + + assert control._item_buffer.items == [] + assert control._request_buffer.deleted == ["raw-request"] + + +def test_finish_request_deletes_unclaimed_request_by_default(): + control = build_control() + + control._finish_request( + request_redis="raw-request", + del_request_redis_after_item_to_db=False, + del_request_redis_after_request_to_db=False, + ) + + assert control._request_buffer.deleted == ["raw-request"] + + +def test_deal_request_uses_request_returned_by_download_middleware(monkeypatch): + parser = FakeParser() + control = build_control() + control._parsers = [parser] + response = FakeResponse() + replacement_request = Request( + "https://replacement.example.com", + parser_name="FakeParser", + auto_request=True, + ) + + def replace_request(request): + return replacement_request, response + + request = Request( + "https://example.com", + parser_name="FakeParser", + download_midware=replace_request, + ) + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + + control.deal_request({"request_obj": request, "request_redis": "raw-request"}) + + assert parser.validated == [(replacement_request, response)] + assert parser.parsed == [(replacement_request, response)] + + +def test_deal_request_handles_fetch_exception_on_replacement_request(monkeypatch): + parser = FakeParserRecordingExceptions() + control = build_control() + control._parsers = [parser] + replacement_request = Request( + "https://replacement.example.com", + parser_name="FakeParser", + auto_request=True, + ) + + def raise_fetch_error(): + raise RuntimeError("replacement fetch failed") + + replacement_request.get_response = raise_fetch_error + + def replace_request(request): + return replacement_request + + request = Request( + "https://example.com", + parser_name="FakeParser", + download_midware=replace_request, + ) + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + monkeypatch.setattr(setting, "LOG_LEVEL", "INFO") + + control.deal_request({"request_obj": request, "request_redis": None}) + + assert parser.exception_requests + assert parser.exception_requests[0][0] is replacement_request + + +def test_deal_request_dispatches_item_and_marks_request_for_item_delete(monkeypatch): + parser = FakeParser() + control = build_control() + control._parsers = [parser] + response = FakeResponse() + request = Request( + "https://example.com", + parser_name="FakeParser", + auto_request=False, + ) + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + + control.deal_request({"request_obj": request, "request_redis": "raw-request"}) + + assert len(control._item_buffer.items) == 2 + assert isinstance(control._item_buffer.items[0], Item) + assert control._item_buffer.items[1] == "raw-request" + assert control._request_buffer.deleted == [] + + +def test_air_parser_control_uses_direct_sync_request_payload(): + control = build_air_control() + request = Request("https://example.com") + + assert control._make_sync_request_payload(request) is request + + +def test_air_parser_control_dispatches_item_without_redis_finish(monkeypatch): + parser = FakeParser() + control = build_air_control() + control._parsers = [parser] + request = Request( + "https://example.com", + parser_name="FakeParser", + auto_request=False, + ) + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + + control.deal_request(request) + + assert len(control._item_buffer.items) == 1 + assert isinstance(control._item_buffer.items[0], Item) + assert control._request_buffer.deleted == [] + + +def test_air_parser_control_allows_none_download_response(monkeypatch): + parser = FakeParserRecordingExceptions() + control = build_air_control() + control._parsers = [parser] + request = Request( + "https://example.com", + parser_name="FakeParser", + auto_request=True, + ) + request.get_response = return_none_response + + monkeypatch.setattr(control, "record_download_status", lambda status, spider: None) + monkeypatch.setattr(control, "_sleep_after_request", lambda: None) + + control.deal_request(request) + + assert parser.exception_requests == [] + assert parser.validated == [(request, None)] + assert parser.parsed == [(request, None)] + + +def test_air_parser_control_finish_request_rejects_redis_deletion_inputs(): + control = build_air_control() + + control._finish_request(None, False, False) + + with pytest.raises(AssertionError): + control._finish_request("raw-request", False, False) + + with pytest.raises(AssertionError): + control._finish_request(None, True, False) + + with pytest.raises(AssertionError): + control._finish_request(None, False, True) diff --git a/tests/test_result_dispatcher.py b/tests/test_result_dispatcher.py new file mode 100644 index 00000000..cd4e29df --- /dev/null +++ b/tests/test_result_dispatcher.py @@ -0,0 +1,169 @@ +import pytest + +from feapder.core.result_dispatcher import ResultDispatcher +from feapder.network.item import Item, UpdateItem +from feapder.network.request import Request + + +class FakeParser: + name = "FakeParser" + + +class FakeRequestBuffer: + def __init__(self): + self.requests = [] + + def put_request(self, request): + self.requests.append(request) + + +class FakeItemBuffer: + def __init__(self): + self.items = [] + + def put_item(self, item): + self.items.append(item) + + +def test_dispatcher_routes_async_request_and_sets_parser_name(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + calls = [] + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=calls.append, + sync_request_factory=lambda request: {"request_obj": request, "request_redis": None}, + ) + next_request = Request("https://example.com") + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com", callback="parse"), + results=[next_request], + ) + + assert next_request.parser_name == "FakeParser" + assert request_buffer.requests == [next_request] + assert item_buffer.items == [] + assert calls == [] + assert result.del_request_redis_after_request_to_db is True + assert result.del_request_redis_after_item_to_db is False + + +def test_dispatcher_routes_sync_request_to_deal_request(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + calls = [] + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=calls.append, + sync_request_factory=lambda request: {"request_obj": request, "request_redis": None}, + ) + next_request = Request("https://example.com", request_sync=True) + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[next_request], + ) + + assert calls == [{"request_obj": next_request, "request_redis": None}] + assert request_buffer.requests == [] + assert result.del_request_redis_after_request_to_db is False + + +def test_dispatcher_routes_item_and_update_item_to_item_buffer(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=lambda request: None, + ) + item = Item(title="one") + update_item = UpdateItem(id=1, title="two") + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[item, update_item], + ) + + assert item_buffer.items == [item, update_item] + assert request_buffer.requests == [] + assert result.del_request_redis_after_item_to_db is True + + +def test_dispatcher_routes_callable_after_item_to_item_buffer(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=lambda request: None, + ) + callback = lambda: None + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[Item(title="one"), callback], + ) + + assert item_buffer.items[-1] is callback + assert request_buffer.requests == [] + assert result.del_request_redis_after_item_to_db is True + + +def test_dispatcher_routes_callable_without_prior_item_to_request_buffer(): + request_buffer = FakeRequestBuffer() + item_buffer = FakeItemBuffer() + dispatcher = ResultDispatcher( + request_buffer=request_buffer, + item_buffer=item_buffer, + deal_request=lambda request: None, + ) + callback = lambda: None + + result = dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[callback], + ) + + assert request_buffer.requests == [callback] + assert item_buffer.items == [] + assert result.del_request_redis_after_request_to_db is True + + +def test_dispatcher_rejects_callable_when_disabled(): + dispatcher = ResultDispatcher( + request_buffer=FakeRequestBuffer(), + item_buffer=FakeItemBuffer(), + deal_request=lambda request: None, + allow_callable=False, + ) + + with pytest.raises(TypeError, match="FakeParser.parse result expect Request or Item"): + dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[lambda: None], + ) + + +def test_dispatcher_rejects_invalid_result_type(): + dispatcher = ResultDispatcher( + request_buffer=FakeRequestBuffer(), + item_buffer=FakeItemBuffer(), + deal_request=lambda request: None, + ) + + with pytest.raises(TypeError, match="FakeParser.parse result expect Request"): + dispatcher.dispatch( + parser=FakeParser(), + request=Request("https://root.example.com"), + results=[object()], + ) diff --git a/tests/test_runtime_state.py b/tests/test_runtime_state.py new file mode 100644 index 00000000..0c8a9bc0 --- /dev/null +++ b/tests/test_runtime_state.py @@ -0,0 +1,56 @@ +from feapder.core.runtime_state import RuntimeState + + +def test_runtime_state_starts_idle_and_running(): + state = RuntimeState() + + assert state.is_idle is True + assert state.is_stop_requested is False + assert state.busy_count == 0 + + +def test_runtime_state_tracks_busy_count(): + state = RuntimeState() + + state.mark_busy() + state.mark_busy() + + assert state.is_idle is False + assert state.busy_count == 2 + + state.mark_idle() + state.mark_idle() + + assert state.is_idle is True + assert state.busy_count == 0 + + +def test_runtime_state_does_not_go_negative(): + state = RuntimeState() + + state.mark_idle() + + assert state.busy_count == 0 + assert state.is_idle is True + + +def test_runtime_state_busy_context_resets_on_exception(): + state = RuntimeState() + + try: + with state.busy(): + assert state.is_idle is False + raise RuntimeError("boom") + except RuntimeError: + pass + + assert state.is_idle is True + + +def test_runtime_state_stop_request_is_sticky(): + state = RuntimeState() + + state.request_stop() + state.request_stop() + + assert state.is_stop_requested is True diff --git a/tests/test_runtime_status.py b/tests/test_runtime_status.py new file mode 100644 index 00000000..c0701405 --- /dev/null +++ b/tests/test_runtime_status.py @@ -0,0 +1,240 @@ +import collections +from queue import Queue + +from feapder.buffer.item_buffer import ItemBuffer +from feapder.buffer.request_buffer import RequestBuffer +from feapder.core.collector import Collector +from feapder.core.scheduler import Scheduler +from feapder.core.runtime_state import RuntimeState +from feapder.network.item import Item + + +class FakeRedis: + def __init__(self, count=0): + self.count = count + + def zget_count(self, table): + return self.count + + +class FakeDoneComponent: + def __init__(self, idle=True): + self._idle = idle + + def is_idle(self): + return self._idle + + +def build_collector(local_count=0, backend_count=0, busy=False): + collector = object.__new__(Collector) + collector._state = RuntimeState() + collector._db = FakeRedis(backend_count) + collector._tab_requests = "test:z_requests" + collector._todo_requests = Queue() + collector._is_collector_task = busy + for i in range(local_count): + collector._todo_requests.put({"request_obj": i, "request_redis": str(i)}) + return collector + + +def build_request_buffer(request_count=0, delete_count=0, flushing=False): + buffer = object.__new__(RequestBuffer) + buffer._state = RuntimeState() + buffer._requests_deque = collections.deque(range(request_count)) + buffer._del_requests_deque = collections.deque(range(delete_count)) + buffer._is_adding_to_db = flushing + return buffer + + +def build_item_buffer(item_count=0, flushing=False): + buffer = object.__new__(ItemBuffer) + buffer._state = RuntimeState() + buffer._items_queue = Queue() + buffer._is_adding_to_db = flushing + for i in range(item_count): + buffer._items_queue.put(i) + return buffer + + +def test_collector_pending_count_includes_local_queue_first(): + collector = build_collector(local_count=2, backend_count=5) + + assert collector.pending_count() == 2 + assert collector.is_idle() is False + + +def test_collector_pending_count_uses_backend_when_local_empty(): + collector = build_collector(local_count=0, backend_count=3) + + assert collector.pending_count() == 3 + assert collector.is_idle() is False + + +def test_collector_idle_when_not_collecting_and_empty(): + collector = build_collector(local_count=0, backend_count=0, busy=False) + + assert collector.pending_count() == 0 + assert collector.is_idle() is True + + +def test_request_buffer_pending_count_includes_writes_and_deletes(): + buffer = build_request_buffer(request_count=2, delete_count=1) + + assert buffer.pending_count() == 3 + assert buffer.is_idle() is False + + +def test_request_buffer_idle_when_empty_and_not_flushing(): + buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False) + + assert buffer.pending_count() == 0 + assert buffer.is_idle() is True + + +def test_item_buffer_idle_when_queue_empty_and_not_flushing(): + buffer = build_item_buffer(item_count=0, flushing=False) + + assert buffer.pending_count() == 0 + assert buffer.is_idle() is True + + +def test_item_buffer_not_idle_while_flushing(): + buffer = build_item_buffer(item_count=0, flushing=True) + + assert buffer.is_idle() is False + + +def test_scheduler_uses_component_idle_methods(monkeypatch): + monkeypatch.setattr("feapder.core.scheduler.tools.delay_time", lambda seconds: None) + scheduler = object.__new__(Scheduler) + scheduler._collector = FakeDoneComponent(idle=True) + scheduler._parser_controls = [FakeDoneComponent(idle=True)] + scheduler._item_buffer = FakeDoneComponent(idle=True) + scheduler._request_buffer = FakeDoneComponent(idle=True) + + assert scheduler.all_thread_is_done() is True + + +def test_scheduler_waits_for_busy_parser(monkeypatch): + monkeypatch.setattr("feapder.core.scheduler.tools.delay_time", lambda seconds: None) + scheduler = object.__new__(Scheduler) + scheduler._collector = FakeDoneComponent(idle=True) + scheduler._parser_controls = [FakeDoneComponent(idle=False)] + scheduler._item_buffer = FakeDoneComponent(idle=True) + scheduler._request_buffer = FakeDoneComponent(idle=True) + + assert scheduler.all_thread_is_done() is False + + +def test_component_run_resets_stop_state_before_loop(monkeypatch): + collector = build_collector(local_count=0, backend_count=0) + collector._state.request_stop() + calls = {"count": 0} + + def stop_after_one_input(): + calls["count"] += 1 + collector._state.request_stop() + + monkeypatch.setattr(collector, "_Collector__input_data", stop_after_one_input) + + collector.run() + + assert calls["count"] == 1 + assert collector.is_stopped() is True + + +def test_request_buffer_not_idle_during_delete_only_flush(): + class InspectingDB: + def __init__(self, buffer): + self.buffer = buffer + self.idle_during_zrem = None + + def zrem(self, table, values): + self.idle_during_zrem = self.buffer.is_idle() + + buffer = build_request_buffer(request_count=0, delete_count=1, flushing=False) + buffer._table_request = "test:z_requests" + buffer._db = InspectingDB(buffer) + + buffer.flush() + + assert buffer._db.idle_during_zrem is False + assert buffer.is_idle() is True + + +def test_request_buffer_marks_noop_flush_attempt_busy_while_checking_queue(): + class InspectingEmptyDeque: + def __init__(self, buffer): + self.buffer = buffer + self.busy_observations = [] + + def __bool__(self): + self.busy_observations.append(self.buffer.is_adding_to_db()) + return False + + def __len__(self): + return 0 + + def popleft(self): + raise AssertionError("empty queue should not be popped") + + buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False) + buffer._requests_deque = InspectingEmptyDeque(buffer) + + buffer.flush() + + assert buffer._requests_deque.busy_observations + assert all(buffer._requests_deque.busy_observations) + assert buffer.is_idle() is True + + +def test_request_buffer_flush_resets_flag_when_zadd_raises(): + class ExplodingDB: + def zadd(self, table, values, prioritys=0): + raise RuntimeError("write failed") + + request = type( + "FakeRequest", + (), + { + "priority": 300, + "filter_repeat": False, + "url": "https://example.com", + "to_dict": {"url": "https://example.com"}, + }, + )() + buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False) + buffer._db = ExplodingDB() + buffer._table_request = "test:z_requests" + buffer._table_failed_request = "test:z_failed_requests" + buffer._requests_deque.append(request) + + buffer.flush() + + assert buffer.is_adding_to_db() is False + + +def test_item_buffer_flush_resets_flag_when_export_raises(monkeypatch): + monkeypatch.setattr("feapder.buffer.item_buffer.setting.ITEM_FILTER_ENABLE", False) + item = Item(title="boom") + buffer = build_item_buffer(item_count=0, flushing=False) + buffer._items_queue.put(item) + buffer._redis_key = "test" + buffer._task_table = None + buffer._item_tables = {} + buffer._item_update_keys = {} + buffer._item_pipelines = {} + buffer._pipelines = [] + buffer._have_mysql_pipeline = True + buffer._mysql_pipeline = None + buffer.export_retry_times = 0 + buffer.export_falied_times = 0 + + def raise_export(table, datas, is_update=False, update_keys=(), used_pipelines=None): + raise RuntimeError("export failed") + + buffer._ItemBuffer__export_to_db = raise_export + + buffer.flush() + + assert buffer.is_adding_to_db() is False