测试函数

投稿 2026-03-09 8:45 点击数: 1

Python实战:打造你的比特币价格实时推送系统


在数字货币波动的世界里,比特币的价格牵动着无数投资者的心,手动刷新交易所查看价格既耗时又容易错过最佳时机,有没有一种方法能让我们实时掌握比特币价格变化,并在关键时刻收到提醒呢?答案是肯定的!利用Python,我们可以轻松构建一个比特币价格实时推送系统,本文将详细介绍如何使用Python获取比特币价格,并通过多种方式实现推送功能。

为什么选择Python

Python作为一种简洁、易学且功能强大的编程语言,在金融数据分析和自动化领域应用广泛,它拥有丰富的第三方库,能够轻松处理网络请求、数据解析和消息发送,是构建此类自动化系统的理想选择。

准备工作:环境与库的搭建

在开始编写代码之前,我们需要准备以下工具和库:

  1. Python环境:确保你的电脑上已安装Python(建议3.6及以上版本)。
  2. 必要的Python库
    • requests:用于发送HTTP请求,获取比特币价格数据。
    • pushovertelegram-botsmtp(用于邮件):用于实现消息推送功能,这里我们以requestspushover(一个流行的推送服务)为例,邮件推送可借助smtplib库实现。
  3. 获取API密钥
    • 我们需要一个提供比特币价格API的服务,许多免费的API可供选择,例如CoinGeckoCoinMarketCap等,这里我们以CoinGecko的公开API为例,它无需API Key即可使用。
    • 如果选择pushover,需要在其官网注册并获取用户密钥和应用API令牌。

安装所需库,可以在命令行中运行:

pip install requests
pip install pushover-complete  # 或者其他你选择的推送库

核心步骤:获取比特币价格

我们需要编写一个Python函数,从API获取比特币的最新价格。CoinGecko API提供了一个简单易用的接口。

import requests
import json
def get_bitcoin_price():
    """
    从CoinGecko API获取比特币价格
    :return: 比特币当前价格(美元),如果获取失败则返回None
    """
    url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd"
    try:
        response = requests.get(url)
        response.raise_for_status()  # 如果请求失败则抛出异常
        data = response.json()
        price = data['bitcoin']['usd']
        return price
    except requests.exceptions.RequestException as e:
        print(f"获取价格时发生错误: {e}")
        return None
    except (KeyError, json.JSONDecodeError) as e:
        print(f"解析价格数据时发生错误: {e}")
        return None
if __name__ == "__main__":
    current_price = get_bitcoin_price()
    if current_price:
        print(f"当前比特币价格: ${current_price:.2f}")

这段代码定义了get_bitcoin_price函数,它发送GET请求到CoinGecko API,解析返回的JSON数据,并提取出比特币对美元的价格,我们加入了错误处理,以应对网络问题或API返回异常的情况。

实现价格推送功能

获取到价格后,下一步就是将其推送到我们的设备上,这里介绍几种常见的推送方式:

使用Pushover推送(推荐,移动端即时通知)

Pushover是一个简单易用的推送通知服务,支持iOS、Android等平台。

  1. 注册Pushover账号并获取API Token和User Key。
  2. 安装pushover-complete库。
  3. 编写推送函数:
from pushover import Pushover
def push_notification(message, api_token, user_key):
    """
    使用Pushover发送通知
    :param message: 推送消息内容
    :param api_token: Pushover应用API Token
    :param user_key: Pushover用户Key
    """
    try:
        p = Pushover(api_token)
        p.message(message)
        p.user(user_key)
        p.send()
        print("推送成功!")
    except Exception as e:
        print(f"推送失败: {e}")
# 替换为你的Pushover API Token和User Key
PUSHOVER_API_TOKEN = "your_api_token_here"
PUSHOVER_USER_KEY = "your_user_key_here"
if __name__ == "__main__":
    p
随机配图
rice = get_bitcoin_price() if price: notification_message = f"比特币价格更新: ${price:.2f}" push_notification(notification_message, PUSHOVER_API_TOKEN, PUSHOVER_USER_KEY)

邮件推送

Python内置的smtplib库可以用来发送邮件。

import smtplib
from email.mime.text import MIMEText
def send_email_notification(message, smtp_server, smtp_port, sender_email, sender_password, receiver_email):
    """
    发送邮件通知
    :param message: 邮件内容
    :param smtp_server: SMTP服务器地址,如 'smtp.gmail.com'
    :param smtp_port: SMTP服务器端口,如 587
    :param sender_email: 发件人邮箱
    :param sender_password: 发件人邮箱密码(部分邮箱需要应用专用密码)
    :param receiver_email: 收件人邮箱
    """
    try:
        msg = MIMEText(message)
        msg['Subject'] = '比特币价格更新'
        msg['From'] = sender_email
        msg['To'] = receiver_email
        with smtplib.SMTP(smtp_server, smtp_port) as server:
            server.starttls()  # 启用TLS加密
            server.login(sender_email, sender_password)
            server.sendmail(sender_email, receiver_email, msg.as_string())
        print("邮件发送成功!")
    except Exception as e:
        print(f"邮件发送失败: {e}")
# 替换为你的邮箱信息(注意:使用Gmail等可能需要生成应用专用密码)
SMTP_SERVER = 'smtp.example.com'  # 'smtp.gmail.com'
SMTP_PORT = 587
SENDER_EMAIL = 'your_email@example.com'
SENDER_PASSWORD = 'your_email_password_or_app_password'
RECEIVER_EMAIL = 'receiver_email@example.com'
if __name__ == "__main__":
    price = get_bitcoin_price()
    if price:
        email_message = f"当前比特币价格: ${price:.2f}"
        send_email_notification(email_message, SMTP_SERVER, SMTP_PORT, SENDER_EMAIL, SENDER_PASSWORD, RECEIVER_EMAIL)

其他推送方式

  • Telegram Bot:可以创建一个Telegram机器人,通过python-telegram-bot库发送消息到指定聊天。
  • 企业微信/钉钉机器人:许多企业协作工具都支持webhook机器人,可以发送HTTP POST请求实现推送。
  • 控制台输出:对于简单的本地监控,直接print价格信息也是一种“推送”。

整合与定时任务:打造全自动推送系统

我们将获取价格和推送功能整合起来,并实现定时自动推送。

import time
import schedule  # 需要安装 schedule 库: pip install schedule
def job():
    print("正在获取比特币价格...")
    current_price = get_bitcoin_price()
    if current_price:
        message = f"【比特币价格提醒】当前价格: ${current_price:.2f}"
        print(f"获取成功: {message}")
        # 这里选择一种推送方式,例如Pushover
        # push_notification(message, PUSHOVER_API_TOKEN, PUSHOVER_USER_KEY)
        # 或者邮件
        # send_email_notification(message, SMTP_SERVER, SMTP_PORT, SENDER_EMAIL, SENDER_PASSWORD, RECEIVER_EMAIL)
        print("推送任务完成。")
    else:
        print("获取价格失败,跳过推送。")
if __name__ == "__main__":
    print("比特币价格推送系统启动...")
    # 每隔10分钟执行一次任务
    schedule.every(10).minutes.do(job)
    # 也可以设置每天特定时间执行,例如每天早上9点
    # schedule.every().day.at("09:00").do(job)
    # 立即执行一次
    job()
    # 保持脚本运行,不断检查定时任务
    while True:
        schedule.run_pending()
        time.sleep(1)

schedule库让定时任务变得非常简单。schedule.every(10).minutes.do(job)表示每10分钟执行一次job函数。while True循环确保脚本持续运行,不断检查并执行到期的任务。

进阶与优化

  1. 价格阈值提醒:可以设置价格高于或低于某个特定值时才推送,避免信息过载。
    def job():
        current_price = get_bitcoin_price()
        if current_price:
            if current_price > 50000:  # 价格高于5万美元才推送
                message = f"比特币价格突破5万美元!当前价格: ${current_price:.2f}"
                push_notification(message, PUSHOVER_API_TOKEN, PUSHOVER_USER_KEY)