|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
database.py
- # @bekbrace
- # FARMSTACK Tutorial - Sunday 13.06.2021
-
- import motor.motor_asyncio
- from model import Todo
-
- client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017/')
- database = client.TodoList
- collection = database.todo
-
- async def fetch_one_todo(title):
- document = await collection.find_one({"title": title})
- return document
-
- async def fetch_all_todos():
- todos = []
- cursor = collection.find({})
- async for document in cursor:
- todos.append(Todo(**document))
- return todos
-
- async def create_todo(todo):
- document = todo
- result = await collection.insert_one(document)
- return document
-
-
- async def update_todo(title, desc):
- await collection.update_one({"title": title}, {"$set": {"description": desc}})
- document = await collection.find_one({"title": title})
- return document
-
- async def remove_todo(title):
- await collection.delete_one({"title": title})
- return True
复制代码
model.py
- # @bekbrace
- # FARMSTACK Tutorial - Sunday 13.06.2021
-
- # Pydantic allows auto creation of JSON Schemas from models
- from pydantic import BaseModel
-
- class Todo(BaseModel):
- title: str
- description: str
复制代码
main.py
- # @bekbrace
- # FARMSTACK Tutorial - Sunday 13.06.2021
-
- from fastapi import FastAPI, HTTPException
-
- from model import Todo
-
- from database import (
- fetch_one_todo,
- fetch_all_todos,
- create_todo,
- update_todo,
- remove_todo,
- )
-
- # an HTTP-specific exception class to generate exception information
-
- from fastapi.middleware.cors import CORSMiddleware
- app = FastAPI()
-
- origins = [
- "http://localhost:3000",
- ]
-
- # what is a middleware?
- # software that acts as a bridge between an operating system or database and applications, especially on a network.
-
- app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
-
- @app.get("/")
- async def read_root():
- return {"Hello": "World"}
-
- @app.get("/api/todo")
- async def get_todo():
- response = await fetch_all_todos()
- return response
-
- @app.get("/api/todo/{title}", response_model=Todo)
- async def get_todo_by_title(title):
- response = await fetch_one_todo(title)
- if response:
- return response
- raise HTTPException(404, f"There is no todo with the title {title}")
-
- @app.post("/api/todo/", response_model=Todo)
- async def post_todo(todo: Todo):
- response = await create_todo(todo.dict())
- if response:
- return response
- raise HTTPException(400, "Something went wrong")
-
- @app.put("/api/todo/{title}/", response_model=Todo)
- async def put_todo(title: str, desc: str):
- response = await update_todo(title, desc)
- if response:
- return response
- raise HTTPException(404, f"There is no todo with the title {title}")
-
- @app.delete("/api/todo/{title}")
- async def delete_todo(title):
- response = await remove_todo(title)
- if response:
- return "Successfully deleted todo"
- raise HTTPException(404, f"There is no todo with the title {title}")
复制代码
|
|