Python 实现闲鱼商品列表批量采集,接口异常重试机制搭建

admin11小时前闲鱼API6
在二手货源比价、反向海淘货源抓取、多平台进销存系统场景中,经常需要批量拉取闲鱼搜索商品列表数据。直接循环请求极易遇到接口超时、签名失效、限流 429、服务端 5xx 错误、网络波动等问题,单次报错直接中断整批采集任务。本文基于 Python 实现稳定批量采集逻辑,封装通用重试装饰器、请求会话池、分页循环采集、异常分级处理、失败任务落库重试,适配闲鱼商品列表接口调用场景,可直接集成到 SAAS 货源系统。
重要提示:本文仅做接口调用技术逻辑演示,请勿高频恶意爬取、突破平台访问限制;商业使用优先对接官方开放 API,逆向接口存在封号、风控风险,需控制请求速率。

二、整体技术架构

  1. 请求层:requests 持久 Session 会话,统一请求头、签名、Cookie 封装

  2. 重试层:自定义带次数、间隔、异常过滤的重试装饰器

  3. 采集业务层:分页循环拉取商品列表,自动翻页终止判断

  4. 数据处理层:JSON 解析、结构化清洗商品基础字段

  5. 容错持久层:失败页码写入临时列表,二次兜底重试

  6. 限速层:随机休眠防风控,控制 QPS

依赖安装
bash
运行
pip install requests

三、核心代码实现

3.1 通用异常重试装饰器

支持自定义重试次数、间隔时间、只针对网络 / 服务异常重试,主动跳过 403 封禁、参数错误这类不可恢复错误
python
运行
import timeimport randomimport requestsfrom functools import wrapsdef api_retry(max_retry: int = 3, sleep_base: float = 1.5):
    """
    接口重试装饰器
    :param max_retry: 最大重试次数
    :param sleep_base: 基础休眠秒数
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retry_count = 0
            while retry_count <= max_retry:
                try:
                    return func(*args, **kwargs)
                except (requests.exceptions.Timeout,
                        requests.exceptions.ConnectionError,
                        requests.exceptions.SSLError) as net_err:
                    # 网络类异常,执行重试
                    retry_count += 1
                    if retry_count > max_retry:
                        raise Exception(f"网络异常耗尽重试次数: {str(net_err)}")
                    sleep_time = sleep_base + random.uniform(0.3, 1.2)
                    print(f"【网络异常】第{retry_count}次重试,休眠{round(sleep_time,2)}s")
                    time.sleep(sleep_time)
                except Exception as e:
                    # 业务异常(403、参数错误、签名失效)不重试,直接抛出
                    raise e        return wrapper    return decorator

3.2 闲鱼列表采集封装类

统一管理 Session、请求头、分页采集、状态码判断、数据解析、失败页缓存
python
运行
class XianYuSpider:
    def __init__(self):
        self.session = requests.Session()
        # 替换为你抓包拿到的真实headers、token、sign、cookie
        self.base_headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ...",
            "Referer": "https://s.2.taobao.com/list/list.htm",
            "Cookie": "你的登录Cookie",
            "token": "接口鉴权token",
            "sign": "动态签名值"
        }
        self.fail_page_list = []  # 存储采集失败页码,后续兜底重试

    @api_retry(max_retry=3, sleep_base=2)
    def fetch_page(self, keyword: str, page_num: int, page_size: int = 40):
        """
        采集单页商品列表
        :param keyword: 搜索关键词
        :param page_num: 页码
        :param page_size: 每页条数
        :return: 解析后的商品数组
        """
        # 闲鱼商品列表接口地址(替换抓包真实API)
        api_url = "https://api.2.taobao.com/xxx/search/list"
        params = {
            "q": keyword,
            "page": page_num,
            "pageSize": page_size,
            "sort": "default"
        }
        resp = self.session.get(
            url=api_url,
            headers=self.base_headers,
            params=params,
            timeout=8
        )
        # 状态码分级处理
        if resp.status_code == 429:
            raise Exception("接口限流429,暂停采集")
        if resp.status_code == 403:
            raise Exception("403封禁,Cookie/Token失效")
        if resp.status_code >= 500:
            raise Exception(f"服务端异常{resp.status_code}")
        if resp.status_code != 200:
            raise Exception(f"异常状态码:{resp.status_code}")

        json_data = resp.json()
        # 校验接口业务code(闲鱼接口一般自带code状态)
        if json_data.get("code") != 0:
            raise Exception(f"接口业务错误:{json_data.get('msg')}")
        item_list = json_data.get("data", {}).get("items", [])
        return item_list    def parse_item(self, item: dict):
        """结构化清洗单条商品数据"""
        return {
            "item_id": item.get("itemId"),
            "title": item.get("title"),
            "price": item.get("price"),
            "original_price": item.get("originalPrice"),
            "seller_nick": item.get("sellerNick"),
            "location": item.get("area"),
            "pic_url": item.get("mainPic"),
            "publish_time": item.get("publishTime"),
            "is_new": item.get("isNew")
        }

    def batch_crawl(self, keyword: str, max_page: int = 10):
        """批量分页采集主逻辑"""
        all_goods = []
        print(f"开始批量采集【{keyword}】,最大页数{max_page}")
        for page in range(1, max_page + 1):
            try:
                item_raw = self.fetch_page(keyword, page)
                if not item_raw:
                    print(f"第{page}页无商品,提前终止分页")
                    break
                # 清洗入库结构
                clean_items = [self.parse_item(i) for i in item_raw]
                all_goods.extend(clean_items)
                print(f"第{page}页采集成功,获取{len(clean_items)}件商品")
                # 随机限速休眠,防止风控
                time.sleep(random.uniform(0.8, 1.8))
            except Exception as e:
                print(f"第{page}页采集失败:{str(e)}")
                self.fail_page_list.append(page)
                # 限流直接中断整批任务,避免加重风控
                if "429" in str(e):
                    print("触发限流,停止所有分页采集")
                    break
        print(f"一轮采集完成,共获取商品{len(all_goods)}条,失败页码:{self.fail_page_list}")
        return all_goods    def retry_failed_pages(self, keyword: str):
        """兜底重试失败页码"""
        if not self.fail_page_list:
            print("无失败页面无需重试")
            return []
        print(f"开始兜底重试{len(self.fail_page_list)}个失败页码")
        retry_goods = []
        temp_fail = []
        for p in self.fail_page_list:
            try:
                items = self.fetch_page(keyword, p)
                clean = [self.parse_item(i) for i in items]
                retry_goods.extend(clean)
                print(f"兜底重试{p}页成功")
                time.sleep(random.uniform(1, 2))
            except Exception as e:
                temp_fail.append(p)
                print(f"兜底重试{p}页仍然失败:{e}")
        self.fail_page_list = temp_fail        return retry_goods

3.3 调用示例

python
运行
if __name__ == "__main__":
    spider = XianYuSpider()
    # 第一轮批量采集
    goods_data = spider.batch_crawl(keyword="笔记本电脑", max_page=8)
    # 兜底重试失败页
    retry_data = spider.retry_failed_pages(keyword="笔记本电脑")
    # 合并全部有效数据
    final_data = goods_data + retry_data    print(f"最终有效商品总数:{len(final_data)}")

    # 可扩展:写入csv/数据库
    # import pandas as pd
    # pd.DataFrame(final_data).to_csv("闲鱼商品列表.csv", index=False, encoding="utf-8-sig")

四、重试机制分层设计详解

4.1 第一层:装饰器即时重试

只针对瞬时故障:网络断开、SSL 错误、超时、服务器临时波动,最多 3 次阶梯休眠重试;对不可逆故障不重试:403 账号封禁、Token 过期、参数错误、接口返回业务报错,直接抛出进入失败列表。

4.2 第二层:业务失败页缓存兜底

一轮分页跑完后,所有报错页码存入fail_page_list,单独开启一轮低频率兜底重试;适合偶发的临时抖动,大幅提升整体采集完整度。

4.3 第三层:限流熔断保护

识别 429 限流状态时,立刻终止全部分页循环,不再消耗请求次数,避免账号被风控封禁,是系统安全屏障。

五、生产环境优化方案

  1. 签名动态更新

    闲鱼接口 sign 大多是实时生成,可接入 JS 引擎执行签名算法,封装自动刷新 sign 函数,放到请求前置逻辑。

  2. 多账号 Session 池

    构建多个登录账号会话,当某个 Session 返回 403 时自动切换备用账号,分布式分摊请求压力。

  3. 异步队列改造

    高并发场景替换为aiohttp异步请求 + celery 任务队列,重试逻辑迁移到任务失败回调,适合 SAAS 多客户同时抓取。

  4. 持久化存储

    失败页码、商品数据存入 MySQL/Redis,支持中断续采、定时增量更新,不用每次从头采集。

  5. 代理 IP 池接入

    大规模采集搭配短效代理池,单 IP 请求频率严格控制,降低 IP 封禁概率。

  6. 日志完整埋点

    替换 print 为 logging 模块,记录每页耗时、重试次数、异常堆栈,方便线上排查稳定性问题。

六、风险与合规提醒

  1. 闲鱼官方有开放平台渠道,商用系统优先申请官方 API 接口,稳定无风控;

  2. 逆向抓包接口属于非官方调用,禁止 7×24 小时高频轮询批量爬取;

  3. 禁止抓取用户隐私信息、批量倒卖采集数据,遵守《网络安全法》与平台用户协议;

  4. 代码仅做技术学习演示,上线商用务必降低采集频率、增加充足休眠间隔。

七、拓展延伸

基于这套重试架构可以无缝迁移:
  1. 扩展调用闲鱼商品详情接口,复用同一套重试装饰器;

  2. 对接淘宝、1688、京东商品列表采集,仅替换接口地址与解析规则;

  3. 集成到反向海淘货源同步系统,定时拉取二手货源价格做比价排序。


相关文章

闲鱼商品采集API商品列表API店铺商品API

 item_get 获取某鱼商品详情item_search 关键字搜索商品列表item_search_shop 获取店铺所有商品列表通过item_search搜索获取打印机的商品,返回商品i...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。