datetime:2022/09/24 15:52
author:nzb
新冠病毒疫情跟踪器API
from typing import List
import requests
from fastapi import APIRouter, Depends, HTTPException, status, Request, BackgroundTasks
from fastapi.templating import Jinja2Templates
from pydantic import HttpUrl
from sqlalchemy.orm import Session
from .database import Base, engine, SessionLocal
from . import curd, schemas, models
application = APIRouter()
templates = Jinja2Templates(directory="./coronavirus/templates")
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@application.post("/create_city", response_model=schemas.ReadCity)
async def create_city(city: schemas.CreateCity, db: Session = Depends(get_db)):
db_city = curd.get_city_by_name(db, name=city.province)
if db_city:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="City already registered")
return curd.create_city(db, city)
@application.get("/get_city/{city}", response_model=schemas.ReadCity)
async def get_city(city: str, db: Session = Depends(get_db)):
db_city = curd.get_city_by_name(db, name=city)
if db_city is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="City not found")
return db_city
@application.get("/get_cities", response_model=List[schemas.ReadCity])
async def get_cities(offset: int = 0, limit: int = 100, db: Session = Depends(get_db)):
cities = curd.get_cities(db, offset=offset, limit=limit)
return cities
@application.post("/create_data", response_model=schemas.ReadData)
async def ceate_data_for_city(city: str, data: schemas.CreateData, db: Session = Depends(get_db)):
db_city = curd.get_city_by_name(db, name=city)
if db_city is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="City not found")
data = curd.create_city_data(db=db, data=data, city_id=db_city.id)
return data
@application.get("/get_data")
def get_data(city: str = None, offset: int = 0, limit: int = 100, db: Session = Depends(get_db)):
data = curd.get_data(db, city=city, offset=offset, limit=limit)
return data
def bg_task(url: HttpUrl, db: Session):
"""这里注意一个坑,不要在后台任务的参数中db: Session = Depends(get_db)这样导入依赖"""
city_data = requests.get(url=f"{url}?source=jhu&country_code=CN&timelines=false")
if city_data.status_code == 200:
db.query(models.City).delete()
for loc in city_data.json().get("locations", []):
city = {
"province": loc["province"],
"country": loc["country"],
"country_code": "CN",
"country_population": loc["country_population"]
}
curd.create_city(db, schemas.CreateCity(**city))
coronavirus_data = requests.get(url=f"{url}?source=jhu&country_code=CN&timelines=true")
if coronavirus_data.status_code == 200:
db.query(models.Data).delete()
for city in coronavirus_data.json().get("locations", []):
db_city = curd.get_city_by_name(db, name=city.get("province", ""))
for date, confirmed in city["timelines"]['confirmed']['timeline'].items():
data = {
"date": date.split("T")[0],
"confirmed": confirmed,
"deaths": city["timelines"]["deaths"]["timeline"][date],
"recovered": 0
}
curd.create_city_data(db, schemas.CreateData(**data), city_id=db_city.id)
@application.get("/sync_coronavirus_data/jhu")
async def sync_coronavirus_data(background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
"""从Johns Hopkins University同步COVID-19数据"""
background_tasks.add_task(bg_task, "https://coronavirus-tracker-api.herokuapp.com/v2/locations", db)
return {"message": "正在后台同步数据..."}
@application.get("/")
async def coronavirus(request: Request, city: str = None, offset: int = 0, limit: int = 100,
db: Session = Depends(get_db)):
data = curd.get_data(db, city=city, offset=offset, limit=limit)
return templates.TemplateResponse("home.html", {
"request": request,
"data": data,
"sync_data_url": "/coronavirus/sync_coronavirus_data/jhu"
})
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./coronavirus.sqlite3"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)
Base = declarative_base(bind=engine, name="Base")
from sqlalchemy.orm import Session
from . import models, schemas
def get_city(db: Session, city_id: int):
return db.query(models.City).filter(models.City.id == city_id).first()
def get_city_by_name(db: Session, name: str):
return db.query(models.City).filter(models.City.province == name).first()
def get_cities(db: Session, offset: int = 0, limit: int = 100):
return db.query(models.City).offset(offset).limit(limit).all()
def create_city(db: Session, city: schemas.CreateCity):
db_city = models.City(**city.dict())
db.add(db_city)
db.commit()
db.refresh(db_city)
return db_city
def get_data(db: Session, city: str = None, offset: int = 0, limit: int = 100):
if city:
return db.query(models.Data).filter(
models.Data.city.has(province=city)).all()
return db.query(models.Data).offset(offset).limit(limit).all()
def create_city_data(db: Session, data: schemas.CreateData, city_id: int):
db_data = models.Data(**data.dict(), city_id=city_id)
db.add(db_data)
db.commit()
db.refresh(db_data)
return db_data
from sqlalchemy import Column, String, Integer, BigInteger, Date, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship
from .database import Base
class City(Base):
__tablename__ = "city"
id = Column(Integer, autoincrement=True, primary_key=True, index=True)
province = Column(String(100), unique=True, nullable=False, comment="省/直辖市")
country = Column(String(100), nullable=False, comment="国家")
country_code = Column(String(100), nullable=True, comment="国家代码")
country_population = Column(BigInteger, nullable=False, comment="国家人口")
data = relationship('Data', back_populates="city")
created_at = Column(DateTime, server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间")
__mapper_args__ = {"order_by": country_code}
def __repr__(self):
return f"{self.country}_{self.province}"
class Data(Base):
__tablename__ = "data"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
city_id = Column(Integer, ForeignKey('city.id'), comment="所属省/直辖市")
date = Column(Date, nullable=False, comment="数据时间")
confirmed = Column(BigInteger, default=0, nullable=False, comment="确诊数量")
deaths = Column(BigInteger, default=0, nullable=False, comment="死亡数量")
recovered = Column(BigInteger, default=0, nullable=False, comment="痊愈数量")
city = relationship("City", back_populates='data')
created_at = Column(DateTime, server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间")
__mapper_args__ = {"order_by": date.desc()}
def __repr__(self):
return f"{repr(self.date)}:确诊{self.confirmed}例"
""" 附上三个SQLAlchemy教程
SQLAlchemy的基本操作大全
http://www.taodudu.cc/news/show-175725.html
Python3+SQLAlchemy+Sqlite3实现ORM教程
https://www.cnblogs.com/jiangxiaobo/p/12350561.html
SQLAlchemy基础知识 Autoflush和Autocommit
https://zhuanlan.zhihu.com/p/48994990
"""
from datetime import date as date_
from datetime import datetime
from pydantic import BaseModel
class CreateData(BaseModel):
date: date_
confirmed: int = 0
deaths: int = 0
recovered: int = 0
class CreateCity(BaseModel):
province: str
country: str
country_code: str
country_population: int
class ReadData(CreateData):
id: int
city_id: int
updated_at: datetime
created_at: datetime
class Config:
orm_mode = True
class ReadCity(CreateCity):
id: int
updated_at: datetime
created_at: datetime
class Config:
orm_mode = True
<!DOCTYPE html>
<html lang="en">
<head>
<title>新冠病毒疫情跟踪器</title>
<link rel="stylesheet" href="{ { url_for('static', path='/semantic.min.css') } }">
<script src="{ { url_for('static', path='/jquery-3.5.1/jquery-3.5.1.min.js') } }"></script>
<script src="{ { url_for('static', path='/semantic.min.js') } }"></script>
<script>
$(document).ready(function () {
$("#filter").click(function () {
const city = $("#city").val();
window.location.href = "http://" + window.location.host + "/coronavirus?city=" + city;
});
$("#sync").click(function () {
$.get("{ { sync_data_url } }", function (result) {
alert("Message: " + result.message);
});
});
});
</script>
</head>
<body>
<div class="ui container">
<h2></h2>
<h1 style="text-align: center">新冠病毒疫情跟踪器</h1>
<h2></h2>
<button id="filter" style="float: left" type="submit" class="ui button alert-secondary">过滤</button>
<div class="ui input">
<label for="city"></label><input id="city" type="text" placeholder="城市" value="">
</div>
<button id="sync" style="float: right" type="submit" class="ui button primary">同步数据</button>
<table class="ui celled table">
<thead>
<tr>
<th>城市</th>
<th>日期</th>
<th>累计确诊数</th>
<th>累计死亡数</th>
<th>累计痊愈数</th>
<th>更新时间</th>
</tr>
</thead>
<tbody>
{% for d in data %}
<tr>
<td>{ { d.city.province } }</td>
<td>{ { d.date } }</td>
<td>{ { d.confirmed } }</td>
<td>{ { d.deaths } }</td>
<td>{ { d.recovered } }</td>
<td>{ { d.updated_at } }</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</body>
</html>