diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 9c0d70a265ad2..ed0ce173bc55b 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -5989,68 +5989,6 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/variables/import: - post: - tags: - - Variable - summary: Import Variables - description: Import variables from a JSON file. - operationId: import_variables - parameters: - - name: action_if_exists - in: query - required: false - schema: - enum: - - overwrite - - fail - - skip - type: string - default: fail - title: Action If Exists - requestBody: - required: true - content: - multipart/form-data: - schema: - $ref: '#/components/schemas/Body_import_variables' - responses: - '200': - description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/VariablesImportResponse' - '401': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Unauthorized - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Forbidden - '400': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Bad Request - '409': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Conflict - '422': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Unprocessable Entity /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}: get: tags: @@ -6629,16 +6567,6 @@ components: - status title: BaseInfoResponse description: Base info serializer for responses. - Body_import_variables: - properties: - file: - type: string - format: binary - title: File - type: object - required: - - file - title: Body_import_variables ClearTaskInstancesBody: properties: dry_run: @@ -10066,26 +9994,6 @@ components: - is_encrypted title: VariableResponse description: Variable serializer for responses. - VariablesImportResponse: - properties: - created_variable_keys: - items: - type: string - type: array - title: Created Variable Keys - import_count: - type: integer - title: Import Count - created_count: - type: integer - title: Created Count - type: object - required: - - created_variable_keys - - import_count - - created_count - title: VariablesImportResponse - description: Import Variables serializer for responses. VersionInfo: properties: version: diff --git a/airflow/api_fastapi/core_api/routes/public/variables.py b/airflow/api_fastapi/core_api/routes/public/variables.py index 19d1b24d7eba8..6bd850d76e609 100644 --- a/airflow/api_fastapi/core_api/routes/public/variables.py +++ b/airflow/api_fastapi/core_api/routes/public/variables.py @@ -16,10 +16,9 @@ # under the License. from __future__ import annotations -import json -from typing import Annotated, Literal +from typing import Annotated -from fastapi import Depends, HTTPException, Query, UploadFile, status +from fastapi import Depends, HTTPException, Query, status from fastapi.exceptions import RequestValidationError from pydantic import ValidationError from sqlalchemy import select @@ -39,7 +38,6 @@ VariableBulkResponse, VariableCollectionResponse, VariableResponse, - VariablesImportResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.services.public.variables import ( @@ -192,59 +190,6 @@ def post_variable( return variable -@variables_router.post( - "/import", - status_code=status.HTTP_200_OK, - responses=create_openapi_http_exception_doc( - [status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT, status.HTTP_422_UNPROCESSABLE_ENTITY] - ), -) -def import_variables( - file: UploadFile, - session: SessionDep, - action_if_exists: Literal["overwrite", "fail", "skip"] = "fail", -) -> VariablesImportResponse: - """Import variables from a JSON file.""" - try: - file_content = file.file.read().decode("utf-8") - variables = json.loads(file_content) - - if not isinstance(variables, dict): - raise ValueError("Uploaded JSON must contain key-value pairs.") - except (json.JSONDecodeError, ValueError) as e: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON format: {e}") - - if not variables: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="No variables found in the provided JSON.", - ) - - existing_keys = {variable for variable in session.execute(select(Variable.key)).scalars()} - import_keys = set(variables.keys()) - - matched_keys = existing_keys & import_keys - - if action_if_exists == "fail" and matched_keys: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"The variables with these keys: {matched_keys} already exists.", - ) - elif action_if_exists == "skip": - create_keys = import_keys - matched_keys - else: - create_keys = import_keys - - for key in create_keys: - Variable.set(key=key, value=variables[key], session=session) - - return VariablesImportResponse( - created_count=len(create_keys), - import_count=len(import_keys), - created_variable_keys=list(create_keys), - ) - - @variables_router.patch("") def bulk_variables( request: VariableBulkBody, diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index a34f57020cd83..b6cf77099005b 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1625,9 +1625,6 @@ export type PoolServicePostPoolMutationResult = Awaited >; -export type VariableServiceImportVariablesMutationResult = Awaited< - ReturnType ->; export type BackfillServicePauseBackfillMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index a87e218adce4c..a43172d73c7ca 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -32,7 +32,6 @@ import { } from "../requests/services.gen"; import { BackfillPostBody, - Body_import_variables, ClearTaskInstancesBody, ConnectionBody, ConnectionBulkBody, @@ -3124,46 +3123,6 @@ export const useVariableServicePostVariable = < VariableService.postVariable({ requestBody }) as unknown as Promise, ...options, }); -/** - * Import Variables - * Import variables from a JSON file. - * @param data The data for the request. - * @param data.formData - * @param data.actionIfExists - * @returns VariablesImportResponse Successful Response - * @throws ApiError - */ -export const useVariableServiceImportVariables = < - TData = Common.VariableServiceImportVariablesMutationResult, - TError = unknown, - TContext = unknown, ->( - options?: Omit< - UseMutationOptions< - TData, - TError, - { - actionIfExists?: "overwrite" | "fail" | "skip"; - formData: Body_import_variables; - }, - TContext - >, - "mutationFn" - >, -) => - useMutation< - TData, - TError, - { - actionIfExists?: "overwrite" | "fail" | "skip"; - formData: Body_import_variables; - }, - TContext - >({ - mutationFn: ({ actionIfExists, formData }) => - VariableService.importVariables({ actionIfExists, formData }) as unknown as Promise, - ...options, - }); /** * Pause Backfill * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 99341b970cac1..598c487ccbd02 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -482,19 +482,6 @@ export const $BaseInfoResponse = { description: "Base info serializer for responses.", } as const; -export const $Body_import_variables = { - properties: { - file: { - type: "string", - format: "binary", - title: "File", - }, - }, - type: "object", - required: ["file"], - title: "Body_import_variables", -} as const; - export const $ClearTaskInstancesBody = { properties: { dry_run: { @@ -5749,30 +5736,6 @@ export const $VariableResponse = { description: "Variable serializer for responses.", } as const; -export const $VariablesImportResponse = { - properties: { - created_variable_keys: { - items: { - type: "string", - }, - type: "array", - title: "Created Variable Keys", - }, - import_count: { - type: "integer", - title: "Import Count", - }, - created_count: { - type: "integer", - title: "Created Count", - }, - }, - type: "object", - required: ["created_variable_keys", "import_count", "created_count"], - title: "VariablesImportResponse", - description: "Import Variables serializer for responses.", -} as const; - export const $VersionInfo = { properties: { version: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 0ce36911eca13..7f888aca34da8 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -187,8 +187,6 @@ import type { PostVariableResponse, BulkVariablesData, BulkVariablesResponse, - ImportVariablesData, - ImportVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetHealthResponse, @@ -3095,34 +3093,6 @@ export class VariableService { }, }); } - - /** - * Import Variables - * Import variables from a JSON file. - * @param data The data for the request. - * @param data.formData - * @param data.actionIfExists - * @returns VariablesImportResponse Successful Response - * @throws ApiError - */ - public static importVariables(data: ImportVariablesData): CancelablePromise { - return __request(OpenAPI, { - method: "POST", - url: "/public/variables/import", - query: { - action_if_exists: data.actionIfExists, - }, - formData: data.formData, - mediaType: "multipart/form-data", - errors: { - 400: "Bad Request", - 401: "Unauthorized", - 403: "Forbidden", - 409: "Conflict", - 422: "Unprocessable Entity", - }, - }); - } } export class DagParsingService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 09911de7c9745..81925913fba42 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -138,10 +138,6 @@ export type BaseInfoResponse = { status: string | null; }; -export type Body_import_variables = { - file: Blob | File; -}; - /** * Request body for Clear Task Instances endpoint. */ @@ -1389,15 +1385,6 @@ export type VariableResponse = { is_encrypted: boolean; }; -/** - * Import Variables serializer for responses. - */ -export type VariablesImportResponse = { - created_variable_keys: Array; - import_count: number; - created_count: number; -}; - /** * Version information serializer for responses. */ @@ -2259,13 +2246,6 @@ export type BulkVariablesData = { export type BulkVariablesResponse = VariableBulkResponse; -export type ImportVariablesData = { - actionIfExists?: "overwrite" | "fail" | "skip"; - formData: Body_import_variables; -}; - -export type ImportVariablesResponse = VariablesImportResponse; - export type ReparseDagFileData = { fileToken: string; }; @@ -4766,37 +4746,6 @@ export type $OpenApiTs = { }; }; }; - "/public/variables/import": { - post: { - req: ImportVariablesData; - res: { - /** - * Successful Response - */ - 200: VariablesImportResponse; - /** - * Bad Request - */ - 400: HTTPExceptionResponse; - /** - * Unauthorized - */ - 401: HTTPExceptionResponse; - /** - * Forbidden - */ - 403: HTTPExceptionResponse; - /** - * Conflict - */ - 409: HTTPExceptionResponse; - /** - * Unprocessable Entity - */ - 422: HTTPExceptionResponse; - }; - }; - }; "/public/parseDagFile/{file_token}": { put: { req: ReparseDagFileData; diff --git a/airflow/ui/src/pages/Variables/ImportVariablesForm.tsx b/airflow/ui/src/pages/Variables/ImportVariablesForm.tsx index 89029a1fd0aef..5645c91d4baf3 100644 --- a/airflow/ui/src/pages/Variables/ImportVariablesForm.tsx +++ b/airflow/ui/src/pages/Variables/ImportVariablesForm.tsx @@ -55,21 +55,65 @@ const ImportVariablesForm = ({ onClose }: ImportVariablesFormProps) => { onSuccessConfirm: onClose, }); - const [selectedFile, setSelectedFile] = useState(undefined); - const [actionIfExists, setActionIfExists] = useState<"fail" | "overwrite" | "skip" | undefined>("fail"); + const [actionIfExists, setActionIfExists] = useState<"fail" | "overwrite" | "skip">("fail"); + const [isParsing, setIsParsing] = useState(false); + const [fileContent, setFileContent] = useState | undefined>(undefined); + + const onFileChange = (file: File) => { + setIsParsing(true); + const reader = new FileReader(); + + reader.addEventListener("load", (event) => { + try { + const text = event.target?.result as string; + const parsedContent = JSON.parse(text) as unknown; + + if ( + typeof parsedContent === "object" && + parsedContent !== null && + Object.entries(parsedContent).every( + ([key, value]) => typeof key === "string" && typeof value === "string", + ) + ) { + const typedContent = parsedContent as Record; + + setFileContent(typedContent); + } else { + throw new Error("Invalid JSON format"); + } + } catch { + setError({ + body: { + detail: + 'Error Parsing JSON File: Upload a JSON file containing variables (e.g., {"key": "value", ...}).', + }, + }); + setFileContent(undefined); + } finally { + setIsParsing(false); + } + }); + + reader.readAsText(file); + }; const onSubmit = () => { setError(undefined); - if (selectedFile) { - const formData = new FormData(); + if (fileContent) { + const formattedPayload = { + actions: [ + { + action: "create" as const, + action_if_exists: actionIfExists, + variables: Object.entries(fileContent).map(([key, value]) => ({ + key, + value, + })), + }, + ], + }; - formData.append("file", selectedFile); - mutate({ - actionIfExists, - formData: { - file: selectedFile, - }, - }); + mutate({ requestBody: formattedPayload }); } }; @@ -82,7 +126,11 @@ const ImportVariablesForm = ({ onClose }: ImportVariablesFormProps) => { mb={6} onFileChange={(files) => { if (files.acceptedFiles.length > 0) { - setSelectedFile(files.acceptedFiles[0]); + setError(undefined); + setFileContent(undefined); + if (files.acceptedFiles[0]) { + onFileChange(files.acceptedFiles[0]); + } } }} required @@ -99,7 +147,8 @@ const ImportVariablesForm = ({ onClose }: ImportVariablesFormProps) => { focusVisibleRing="inside" me="-1" onClick={() => { - setSelectedFile(undefined); + setError(undefined); + setFileContent(undefined); }} pointerEvents="auto" size="xs" @@ -112,6 +161,11 @@ const ImportVariablesForm = ({ onClose }: ImportVariablesFormProps) => { > + {isParsing ? ( +
+ Parsing file... +
+ ) : undefined} { {isPending ? ( - +
- +
) : undefined} -
diff --git a/airflow/ui/src/queries/useImportVariables.ts b/airflow/ui/src/queries/useImportVariables.ts index b4692e37c535f..212d9c83e1993 100644 --- a/airflow/ui/src/queries/useImportVariables.ts +++ b/airflow/ui/src/queries/useImportVariables.ts @@ -19,36 +19,43 @@ import { useQueryClient } from "@tanstack/react-query"; import { useState } from "react"; -import { useVariableServiceGetVariablesKey, useVariableServiceImportVariables } from "openapi/queries"; +import { useVariableServiceBulkVariables, useVariableServiceGetVariablesKey } from "openapi/queries"; import { toaster } from "src/components/ui"; export const useImportVariables = ({ onSuccessConfirm }: { onSuccessConfirm: () => void }) => { const queryClient = useQueryClient(); const [error, setError] = useState(undefined); - const onSuccess = async (responseData: { - created_count: number; - created_variable_keys: Array; - import_count: number; - }) => { + const onSuccess = async (responseData: { create?: { errors: Array; success: Array } }) => { await queryClient.invalidateQueries({ queryKey: [useVariableServiceGetVariablesKey], }); - toaster.create({ - description: `${responseData.created_count} of ${responseData.import_count} variables imported successfully. Keys imported are ${responseData.created_variable_keys.join(", ")}`, - title: "Import Variables Request Successful", - type: "success", - }); + if (responseData.create) { + const { errors, success } = responseData.create; + + if (Array.isArray(errors) && errors.length > 0) { + const apiError = errors[0] as { error: string }; - onSuccessConfirm(); + setError({ + body: { detail: apiError.error }, + }); + } else if (Array.isArray(success) && success.length > 0) { + toaster.create({ + description: `${success.length} variables created successfully. Keys: ${success.join(", ")}`, + title: "Import Variables Request Successful", + type: "success", + }); + onSuccessConfirm(); + } + } }; const onError = (_error: unknown) => { setError(_error); }; - const { isPending, mutate } = useVariableServiceImportVariables({ + const { isPending, mutate } = useVariableServiceBulkVariables({ onError, onSuccess, }); diff --git a/tests/api_fastapi/core_api/routes/public/test_variables.py b/tests/api_fastapi/core_api/routes/public/test_variables.py index 3cbab24878ac7..fac8b27472449 100644 --- a/tests/api_fastapi/core_api/routes/public/test_variables.py +++ b/tests/api_fastapi/core_api/routes/public/test_variables.py @@ -433,113 +433,6 @@ def test_post_should_respond_422_when_value_is_null(self, test_client): } -class TestImportVariables(TestVariableEndpoint): - @pytest.mark.enable_redact - @pytest.mark.parametrize( - "variables_data, behavior, expected_status_code, expected_created_count, expected_created_keys, expected_conflict_keys", - [ - ( - {"new_key1": "new_value1", "new_key2": "new_value2"}, - "overwrite", - 200, - 2, - {"new_key1", "new_key2"}, - set(), - ), - ( - {"new_key1": "new_value1", "new_key2": "new_value2"}, - "skip", - 200, - 2, - {"new_key1", "new_key2"}, - set(), - ), - ( - {"test_variable_key": "new_value", "new_key": "new_value"}, - "fail", - 409, - 0, - set(), - {"test_variable_key"}, - ), - ( - {"test_variable_key": "new_value", "new_key": "new_value"}, - "skip", - 200, - 1, - {"new_key"}, - {"test_variable_key"}, - ), - ( - {"test_variable_key": "new_value", "new_key": "new_value"}, - "overwrite", - 200, - 2, - {"test_variable_key", "new_key"}, - set(), - ), - ], - ) - def test_import_variables( - self, - test_client, - variables_data, - behavior, - expected_status_code, - expected_created_count, - expected_created_keys, - expected_conflict_keys, - session, - ): - """Test variable import with different behaviors (overwrite, fail, skip).""" - - self.create_variables() - - file = create_file_upload(variables_data) - response = test_client.post( - "/public/variables/import", - files={"file": ("variables.json", file, "application/json")}, - params={"action_if_exists": behavior}, - ) - - assert response.status_code == expected_status_code - - if expected_status_code == 200: - body = response.json() - assert body["created_count"] == expected_created_count - assert set(body["created_variable_keys"]) == expected_created_keys - - elif expected_status_code == 409: - body = response.json() - assert ( - f"The variables with these keys: {expected_conflict_keys} already exists." == body["detail"] - ) - - def test_import_invalid_json(self, test_client): - """Test invalid JSON import.""" - file = BytesIO(b"import variable test") - response = test_client.post( - "/public/variables/import", - files={"file": ("variables.json", file, "application/json")}, - params={"action_if_exists": "overwrite"}, - ) - - assert response.status_code == 400 - assert "Invalid JSON format" in response.json()["detail"] - - def test_import_empty_file(self, test_client): - """Test empty file import.""" - file = create_file_upload({}) - response = test_client.post( - "/public/variables/import", - files={"file": ("empty_variables.json", file, "application/json")}, - params={"action_if_exists": "overwrite"}, - ) - - assert response.status_code == 422 - assert response.json()["detail"] == "No variables found in the provided JSON." - - class TestBulkVariables(TestVariableEndpoint): @pytest.mark.enable_redact @pytest.mark.parametrize(