测试函数
Python实战:打造你的比特币价格实时推送系统
在数字货币波动的世界里,比特币的价格牵动着无数投资者的心,手动刷新交易所查看价格既耗时又容易错过最佳时机,有没有一种方法能让我们实时掌握比特币价格变化,并在关键时刻收到提醒呢?答案是肯定的!利用Python,我们可以轻松构建一个比特币价格实时推送系统,本文将详细介绍如何使用Python获取比特币价格,并通过多种方式实现推送功能。
为什么选择Python
Python作为一种简洁、易学且功能强大的编程语言,在金融数据分析和自动化领域应用广泛,它拥有丰富的第三方库,能够轻松处理网络请求、数据解析和消息发送,是构建此类自动化系统的理想选择。
准备工作:环境与库的搭建
在开始编写代码之前,我们需要准备以下工具和库:
- Python环境:确保你的电脑上已安装Python(建议3.6及以上版本)。
- 必要的Python库:
requests:用于发送HTTP请求,获取比特币价格数据。pushover或telegram-bot或smtp(用于邮件):用于实现消息推送功能,这里我们以requests和pushover(一个流行的推送服务)为例,邮件推送可借助smtplib库实现。
- 获取API密钥:
- 我们需要一个提供比特币价格API的服务,许多免费的API可供选择,例如
CoinGecko、CoinMarketCap等,这里我们以CoinGecko的公开API为例,它无需API Key即可使用。 - 如果选择
pushover,需要在其官网注册并获取用户密钥和应用API令牌。
- 我们需要一个提供比特币价格API的服务,许多免费的API可供选择,例如
安装所需库,可以在命令行中运行:
pip install requests pip install pushover-complete # 或者其他你选择的推送库
核心步骤:获取比特币价格
我们需要编写一个Python函数,从API获取比特币的最新价格。CoinGecko API提供了一个简单易用的接口。
import requests
import json
def get_bitcoin_price():
"""
从CoinGecko API获取比特币价格
:return: 比特币当前价格(美元),如果获取失败则返回None
"""
url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd"
try:
response = requests.get(url)
response.raise_for_status() # 如果请求失败则抛出异常
data = response.json()
price = data['bitcoin']['usd']
return price
except requests.exceptions.RequestException as e:
print(f"获取价格时发生错误: {e}")
return None
except (KeyError, json.JSONDecodeError) as e:
print(f"解析价格数据时发生错误: {e}")
return None
if __name__ == "__main__":
current_price = get_bitcoin_price()
if current_price:
print(f"当前比特币价格: ${current_price:.2f}")
这段代码定义了get_bitcoin_price函数,它发送GET请求到CoinGecko API,解析返回的JSON数据,并提取出比特币对美元的价格,我们加入了错误处理,以应对网络问题或API返回异常的情况。
实现价格推送功能
获取到价格后,下一步就是将其推送到我们的设备上,这里介绍几种常见的推送方式:
使用Pushover推送(推荐,移动端即时通知)
Pushover是一个简单易用的推送通知服务,支持iOS、Android等平台。
- 注册Pushover账号并获取API Token和User Key。
- 安装
pushover-complete库。 - 编写推送函数:
from pushover import Pushover
def push_notification(message, api_token, user_key):
"""
使用Pushover发送通知
:param message: 推送消息内容
:param api_token: Pushover应用API Token
:param user_key: Pushover用户Key
"""
try:
p = Pushover(api_token)
p.message(message)
p.user(user_key)
p.send()
print("推送成功!")
except Exception as e:
print(f"推送失败: {e}")
# 替换为你的Pushover API Token和User Key
PUSHOVER_API_TOKEN = "your_api_token_here"
PUSHOVER_USER_KEY = "your_user_key_here"
if __name__ == "__main__":
p
rice = get_bitcoin_price()
if price:
notification_message = f"比特币价格更新: ${price:.2f}"
push_notification(notification_message, PUSHOVER_API_TOKEN, PUSHOVER_USER_KEY)
邮件推送
Python内置的smtplib库可以用来发送邮件。
import smtplib
from email.mime.text import MIMEText
def send_email_notification(message, smtp_server, smtp_port, sender_email, sender_password, receiver_email):
"""
发送邮件通知
:param message: 邮件内容
:param smtp_server: SMTP服务器地址,如 'smtp.gmail.com'
:param smtp_port: SMTP服务器端口,如 587
:param sender_email: 发件人邮箱
:param sender_password: 发件人邮箱密码(部分邮箱需要应用专用密码)
:param receiver_email: 收件人邮箱
"""
try:
msg = MIMEText(message)
msg['Subject'] = '比特币价格更新'
msg['From'] = sender_email
msg['To'] = receiver_email
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls() # 启用TLS加密
server.login(sender_email, sender_password)
server.sendmail(sender_email, receiver_email, msg.as_string())
print("邮件发送成功!")
except Exception as e:
print(f"邮件发送失败: {e}")
# 替换为你的邮箱信息(注意:使用Gmail等可能需要生成应用专用密码)
SMTP_SERVER = 'smtp.example.com' # 'smtp.gmail.com'
SMTP_PORT = 587
SENDER_EMAIL = 'your_email@example.com'
SENDER_PASSWORD = 'your_email_password_or_app_password'
RECEIVER_EMAIL = 'receiver_email@example.com'
if __name__ == "__main__":
price = get_bitcoin_price()
if price:
email_message = f"当前比特币价格: ${price:.2f}"
send_email_notification(email_message, SMTP_SERVER, SMTP_PORT, SENDER_EMAIL, SENDER_PASSWORD, RECEIVER_EMAIL)
其他推送方式
- Telegram Bot:可以创建一个Telegram机器人,通过
python-telegram-bot库发送消息到指定聊天。 - 企业微信/钉钉机器人:许多企业协作工具都支持webhook机器人,可以发送HTTP POST请求实现推送。
- 控制台输出:对于简单的本地监控,直接
print价格信息也是一种“推送”。
整合与定时任务:打造全自动推送系统
我们将获取价格和推送功能整合起来,并实现定时自动推送。
import time
import schedule # 需要安装 schedule 库: pip install schedule
def job():
print("正在获取比特币价格...")
current_price = get_bitcoin_price()
if current_price:
message = f"【比特币价格提醒】当前价格: ${current_price:.2f}"
print(f"获取成功: {message}")
# 这里选择一种推送方式,例如Pushover
# push_notification(message, PUSHOVER_API_TOKEN, PUSHOVER_USER_KEY)
# 或者邮件
# send_email_notification(message, SMTP_SERVER, SMTP_PORT, SENDER_EMAIL, SENDER_PASSWORD, RECEIVER_EMAIL)
print("推送任务完成。")
else:
print("获取价格失败,跳过推送。")
if __name__ == "__main__":
print("比特币价格推送系统启动...")
# 每隔10分钟执行一次任务
schedule.every(10).minutes.do(job)
# 也可以设置每天特定时间执行,例如每天早上9点
# schedule.every().day.at("09:00").do(job)
# 立即执行一次
job()
# 保持脚本运行,不断检查定时任务
while True:
schedule.run_pending()
time.sleep(1)
schedule库让定时任务变得非常简单。schedule.every(10).minutes.do(job)表示每10分钟执行一次job函数。while True循环确保脚本持续运行,不断检查并执行到期的任务。
进阶与优化
- 价格阈值提醒:可以设置价格高于或低于某个特定值时才推送,避免信息过载。
def job(): current_price = get_bitcoin_price() if current_price: if current_price > 50000: # 价格高于5万美元才推送 message = f"比特币价格突破5万美元!当前价格: ${current_price:.2f}" push_notification(message, PUSHOVER_API_TOKEN, PUSHOVER_USER_KEY)