Skip to content

Commit

Permalink
Implement endpoints for creating and updating sheet (#30)
Browse files Browse the repository at this point in the history
* wip

* Refactoring

* Fix tests

* Refactoring

* Add tests for create-sheet endpoint

* Initial update-sheet implementation

* Asyncify part of update-sheet endpoint

* wip

* Update tests

* Refactoring wip

* Refactoring

* Implement get-all-sheet-titles endpoint

* Update tests
  • Loading branch information
rjambrecic authored Jun 27, 2024
1 parent 95d393d commit f3d8368
Show file tree
Hide file tree
Showing 6 changed files with 641 additions and 109 deletions.
223 changes: 128 additions & 95 deletions google_sheets/app.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import json
import logging
import urllib.parse
from os import environ
from pathlib import Path
from typing import Annotated, Any, Dict, List, Union
from typing import Annotated, Dict, List, Union

import httpx
from asyncify import asyncify
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi import FastAPI, HTTPException, Query, Request, Response, status
from fastapi.responses import RedirectResponse
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from prisma.errors import RecordNotFoundError
from googleapiclient.errors import HttpError

from . import __version__
from .db_helpers import get_db_connection
from .google_api import (
build_service,
create_sheet_f,
get_all_sheet_titles_f,
get_files_f,
get_google_oauth_url,
get_sheet_f,
get_token_request_data,
oauth2_settings,
update_sheet_f,
)
from .model import GoogleSheetValues

__all__ = ["app"]

Expand All @@ -33,19 +40,6 @@
title="google-sheets",
)

# Load client secret data from the JSON file
with Path("client_secret.json").open() as secret_file:
client_secret_data = json.load(secret_file)

# OAuth2 configuration
oauth2_settings = {
"auth_uri": client_secret_data["web"]["auth_uri"],
"tokenUrl": client_secret_data["web"]["token_uri"],
"clientId": client_secret_data["web"]["client_id"],
"clientSecret": client_secret_data["web"]["client_secret"],
"redirectUri": client_secret_data["web"]["redirect_uris"][0],
}


async def is_authenticated_for_ads(user_id: int) -> bool:
async with get_db_connection() as db:
Expand All @@ -68,12 +62,7 @@ async def get_login_url(
if is_authenticated:
return {"login_url": "User is already authenticated"}

google_oauth_url = (
f"{oauth2_settings['auth_uri']}?client_id={oauth2_settings['clientId']}"
f"&redirect_uri={oauth2_settings['redirectUri']}&response_type=code"
f"&scope={urllib.parse.quote_plus('email https://www.googleapis.com/auth/spreadsheets https://www.googleapis.com/auth/drive.metadata.readonly')}"
f"&access_type=offline&prompt=consent&state={user_id}"
)
google_oauth_url = get_google_oauth_url(user_id)
markdown_url = f"To navigate Google Ads waters, I require access to your account. Please [click here]({google_oauth_url}) to grant permission."
return {"login_url": markdown_url}

Expand All @@ -92,13 +81,7 @@ async def login_callback(
raise HTTPException(status_code=400, detail="User ID must be an integer")
user_id = int(state)

token_request_data = {
"code": code,
"client_id": oauth2_settings["clientId"],
"client_secret": oauth2_settings["clientSecret"],
"redirect_uri": oauth2_settings["redirectUri"],
"grant_type": "authorization_code",
}
token_request_data = get_token_request_data(code)

async with httpx.AsyncClient() as client:
response = await client.post(
Expand Down Expand Up @@ -140,63 +123,22 @@ async def login_callback(
return RedirectResponse(url=f"{base_url}/login/success")


async def load_user_credentials(user_id: Union[int, str]) -> Any:
async with get_db_connection() as db:
try:
data = await db.gauth.find_unique_or_raise(where={"user_id": user_id}) # type: ignore[typeddict-item]
except RecordNotFoundError as e:
raise HTTPException(
status_code=404, detail="User hasn't grant access yet!"
) from e

return data.creds


async def _build_service(user_id: int, service_name: str, version: str) -> Any:
user_credentials = await load_user_credentials(user_id)
sheets_credentials: Dict[str, str] = {
"refresh_token": user_credentials["refresh_token"],
"client_id": oauth2_settings["clientId"],
"client_secret": oauth2_settings["clientSecret"],
}

creds = Credentials.from_authorized_user_info( # type: ignore[no-untyped-call]
info=sheets_credentials,
scopes=[
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive.metadata.readonly",
],
)
service = build(serviceName=service_name, version=version, credentials=creds)
return service


@asyncify # type: ignore[misc]
def _get_sheet(service: Any, spreadsheet_id: str, range: str) -> Any:
# Call the Sheets API
sheet = service.spreadsheets()
result = sheet.values().get(spreadsheetId=spreadsheet_id, range=range).execute()
values = result.get("values", [])

return values


@app.get("/sheet", description="Get data from a Google Sheet")
@app.get("/get-sheet", description="Get data from a Google Sheet")
async def get_sheet(
user_id: Annotated[
int, Query(description="The user ID for which the data is requested")
],
spreadsheet_id: Annotated[
str, Query(description="ID of the Google Sheet to fetch data from")
],
range: Annotated[
title: Annotated[
str,
Query(description="The range of cells to fetch data from. E.g. 'Sheet1!A1:B2'"),
Query(description="The title of the sheet to fetch data from"),
],
) -> Union[str, List[List[str]]]:
service = await _build_service(user_id=user_id, service_name="sheets", version="v4")
values = await _get_sheet(
service=service, spreadsheet_id=spreadsheet_id, range=range
service = await build_service(user_id=user_id, service_name="sheets", version="v4")
values = await get_sheet_f(
service=service, spreadsheet_id=spreadsheet_id, range=title
)

if not values:
Expand All @@ -205,20 +147,85 @@ async def get_sheet(
return values # type: ignore[no-any-return]


@asyncify # type: ignore[misc]
def _get_files(service: Any) -> List[Dict[str, str]]:
# Call the Drive v3 API
results = (
service.files()
.list(
q="mimeType='application/vnd.google-apps.spreadsheet'",
pageSize=100, # The default value is 100
fields="nextPageToken, files(id, name)",
@app.post(
"/update-sheet",
description="Update data in a Google Sheet within the existing spreadsheet",
)
async def update_sheet(
user_id: Annotated[
int, Query(description="The user ID for which the data is requested")
],
spreadsheet_id: Annotated[
str, Query(description="ID of the Google Sheet to fetch data from")
],
title: Annotated[
str,
Query(description="The title of the sheet to update"),
],
sheet_values: GoogleSheetValues,
) -> Response:
service = await build_service(user_id=user_id, service_name="sheets", version="v4")

try:
await update_sheet_f(
service=service,
spreadsheet_id=spreadsheet_id,
range=title,
sheet_values=sheet_values,
)
.execute()
except HttpError as e:
raise HTTPException(status_code=e.status_code, detail=e._get_reason()) from e
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
) from e

return Response(
status_code=status.HTTP_200_OK,
content=f"Sheet with the name '{title}' has been updated successfully.",
)


@app.post(
"/create-sheet",
description="Create a new Google Sheet within the existing spreadsheet",
)
async def create_sheet(
user_id: Annotated[
int, Query(description="The user ID for which the data is requested")
],
spreadsheet_id: Annotated[
str, Query(description="ID of the Google Sheet to fetch data from")
],
title: Annotated[
str,
Query(description="The title of the new sheet"),
],
) -> Response:
service = await build_service(user_id=user_id, service_name="sheets", version="v4")
try:
await create_sheet_f(
service=service, spreadsheet_id=spreadsheet_id, title=title
)
except HttpError as e:
if (
e.status_code == status.HTTP_400_BAD_REQUEST
and f'A sheet with the name "{title}" already exists' in e._get_reason()
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f'A sheet with the name "{title}" already exists. Please enter another name.',
) from e
raise HTTPException(status_code=e.status_code, detail=e._get_reason()) from e
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
) from e

return Response(
status_code=status.HTTP_201_CREATED,
content=f"Sheet with the name '{title}' has been created successfully.",
)
items = results.get("files", [])
return items # type: ignore[no-any-return]


@app.get("/get-all-file-names", description="Get all sheets associated with the user")
Expand All @@ -227,8 +234,34 @@ async def get_all_file_names(
int, Query(description="The user ID for which the data is requested")
],
) -> Dict[str, str]:
service = await _build_service(user_id=user_id, service_name="drive", version="v3")
files: List[Dict[str, str]] = await _get_files(service=service)
service = await build_service(user_id=user_id, service_name="drive", version="v3")
files: List[Dict[str, str]] = await get_files_f(service=service)
# create dict where key is id and value is name
files_dict = {file["id"]: file["name"] for file in files}
return files_dict


@app.get(
"/get-all-sheet-titles",
description="Get all sheet titles within a Google Spreadsheet",
)
async def get_all_sheet_titles(
user_id: Annotated[
int, Query(description="The user ID for which the data is requested")
],
spreadsheet_id: Annotated[
str, Query(description="ID of the Google Sheet to fetch data from")
],
) -> List[str]:
service = await build_service(user_id=user_id, service_name="sheets", version="v4")
try:
sheets = await get_all_sheet_titles_f(
service=service, spreadsheet_id=spreadsheet_id
)
except HttpError as e:
raise HTTPException(status_code=e.status_code, detail=e._get_reason()) from e
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
) from e
return sheets
25 changes: 25 additions & 0 deletions google_sheets/google_api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from .oauth_settings import (
get_google_oauth_url,
get_token_request_data,
oauth2_settings,
)
from .service import (
build_service,
create_sheet_f,
get_all_sheet_titles_f,
get_files_f,
get_sheet_f,
update_sheet_f,
)

__all__ = [
"build_service",
"create_sheet_f",
"get_all_sheet_titles_f",
"get_files_f",
"get_google_oauth_url",
"get_sheet_f",
"get_token_request_data",
"oauth2_settings",
"update_sheet_f",
]
40 changes: 40 additions & 0 deletions google_sheets/google_api/oauth_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import json
import urllib.parse
from pathlib import Path
from typing import Any, Dict

__all__ = ["get_google_oauth_url", "get_token_request_data", "oauth2_settings"]


# Load client secret data from the JSON file
with Path("client_secret.json").open() as secret_file:
client_secret_data = json.load(secret_file)

# OAuth2 configuration
oauth2_settings = {
"auth_uri": client_secret_data["web"]["auth_uri"],
"tokenUrl": client_secret_data["web"]["token_uri"],
"clientId": client_secret_data["web"]["client_id"],
"clientSecret": client_secret_data["web"]["client_secret"],
"redirectUri": client_secret_data["web"]["redirect_uris"][0],
}


def get_google_oauth_url(user_id: int) -> str:
google_oauth_url = (
f"{oauth2_settings['auth_uri']}?client_id={oauth2_settings['clientId']}"
f"&redirect_uri={oauth2_settings['redirectUri']}&response_type=code"
f"&scope={urllib.parse.quote_plus('email https://www.googleapis.com/auth/spreadsheets https://www.googleapis.com/auth/drive.metadata.readonly')}"
f"&access_type=offline&prompt=consent&state={user_id}"
)
return google_oauth_url


def get_token_request_data(code: str) -> Dict[str, Any]:
return {
"code": code,
"client_id": oauth2_settings["clientId"],
"client_secret": oauth2_settings["clientSecret"],
"redirect_uri": oauth2_settings["redirectUri"],
"grant_type": "authorization_code",
}
Loading

0 comments on commit f3d8368

Please sign in to comment.