diff --git a/VERSION b/VERSION index 25ce11324..c15ef4be5 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.8.11 \ No newline at end of file +1.8.12 \ No newline at end of file diff --git a/cloudbuild/staging/deploy.yaml b/cloudbuild/staging/deploy.yaml index 78a029b3e..c70244778 100644 --- a/cloudbuild/staging/deploy.yaml +++ b/cloudbuild/staging/deploy.yaml @@ -53,7 +53,7 @@ substitutions: _CLUSTER: main _COOKIE: default-${_CLUSTER} _NORMALIZED_IMAGE_TAG: ${_IMAGE_TAG} - _INSTANCE_TYPE: c2d-highcpu-2 + _INSTANCE_TYPE: c2d-highcpu-4 _INSTANCE_GROUP: instance-group-staging-${_CLUSTER} _IMAGE_TAG: $SHORT_SHA _TEMPLATE_NAME: logflare-staging-${_CLUSTER}-cluster-${_NORMALIZED_IMAGE_TAG} diff --git a/lib/logflare/application.ex b/lib/logflare/application.ex index 48f9e4e03..ea5e15071 100644 --- a/lib/logflare/application.ex +++ b/lib/logflare/application.ex @@ -51,7 +51,7 @@ defmodule Logflare.Application do name: Logflare.V1SourceRegistry, keys: :unique, partitions: max(round(System.schedulers_online() / 8), 1)}, - {Task.Supervisor, name: Logflare.TaskSupervisor}, + {PartitionSupervisor, child_spec: Task.Supervisor, name: Logflare.TaskSupervisors}, {DynamicSupervisor, strategy: :one_for_one, name: Logflare.Endpoints.Cache}, {DynamicSupervisor, strategy: :one_for_one, @@ -75,11 +75,7 @@ defmodule Logflare.Application do conditional_children() ++ [ Logflare.ErlSysMon, - {Task.Supervisor, - name: Logflare.TaskSupervisor, - spawn_opt: [ - fullsweep_after: 1_000 - ]}, + {PartitionSupervisor, child_spec: Task.Supervisor, name: Logflare.TaskSupervisors}, {Cluster.Supervisor, [topologies, [name: Logflare.ClusterSupervisor]]}, Logflare.Repo, Logflare.Vault, diff --git a/lib/logflare/backends.ex b/lib/logflare/backends.ex index ea42964c5..ab233f332 100644 --- a/lib/logflare/backends.ex +++ b/lib/logflare/backends.ex @@ -1,6 +1,7 @@ defmodule Logflare.Backends do @moduledoc false + alias Logflare.Utils.Tasks alias Logflare.Backends.Adaptor alias Logflare.Backends.Backend alias Logflare.Backends.SourceRegistry @@ -18,7 +19,6 @@ defmodule Logflare.Backends do alias Logflare.SystemMetrics alias Logflare.PubSubRates alias Logflare.Cluster - alias Logflare.TaskSupervisor alias Logflare.Source.RecentLogsServer import Ecto.Query @@ -322,13 +322,11 @@ defmodule Logflare.Backends do end defp maybe_broadcast_and_route(source, log_events) do - Logflare.Utils.Tasks.start_child(fn -> - if source.metrics.avg < 5 do - Source.ChannelTopics.broadcast_new(log_events) - end + if source.metrics.avg < 5 do + Source.ChannelTopics.broadcast_new(log_events) + end - SourceRouting.route_to_sinks_and_ingest(log_events) - end) + SourceRouting.route_to_sinks_and_ingest(log_events) :ok end @@ -517,16 +515,16 @@ defmodule Logflare.Backends do nodes = Cluster.Utils.node_list_all() task = - Task.async(fn -> + Tasks.async(fn -> nodes |> Enum.map( - &Task.Supervisor.async({TaskSupervisor, &1}, __MODULE__, :list_recent_logs_local, [ - source - ]) + &Tasks.async(fn -> + :erpc.call(&1, __MODULE__, :list_recent_logs_local, [source], 10_000) + end) ) |> Task.yield_many() |> Enum.map(fn {%Task{pid: pid}, res} -> - res || Task.Supervisor.terminate_child(TaskSupervisor, pid) + res || Task.shutdown(pid) end) end) diff --git a/lib/logflare/google/bigquery/bigquery.ex b/lib/logflare/google/bigquery/bigquery.ex index 04a45495e..993f28ac0 100644 --- a/lib/logflare/google/bigquery/bigquery.ex +++ b/lib/logflare/google/bigquery/bigquery.ex @@ -24,6 +24,7 @@ defmodule Logflare.Google.BigQuery do alias Logflare.Billing.Plan alias Logflare.TeamUsers alias Logflare.Source.BigQuery.SchemaBuilder + alias Logflare.Utils.Tasks @type ok_err_tup :: {:ok, term} | {:error, term} @@ -285,7 +286,7 @@ defmodule Logflare.Google.BigQuery do for x <- [user | team_users], x.provider == "google", do: x.email if Enum.count(user.sources) > 0 do - Task.Supervisor.start_child(Logflare.TaskSupervisor, fn -> + Tasks.start_child(fn -> patch(dataset_id, emails, project_id, user.id) end) diff --git a/lib/logflare/google/resource_manager.ex b/lib/logflare/google/resource_manager.ex index ff06e3510..0b7de589d 100644 --- a/lib/logflare/google/resource_manager.ex +++ b/lib/logflare/google/resource_manager.ex @@ -11,6 +11,7 @@ defmodule Logflare.Google.CloudResourceManager do alias Logflare.User alias Logflare.TeamUsers alias Logflare.Billing + alias Logflare.Utils.Tasks def list_projects() do conn = GenUtils.get_conn() @@ -32,7 +33,7 @@ defmodule Logflare.Google.CloudResourceManager do def set_iam_policy(opts \\ [async: true]) def set_iam_policy(async: true) do - Task.Supervisor.start_child(Logflare.TaskSupervisor, fn -> set_iam_policy(async: false) end) + Tasks.start_child(fn -> set_iam_policy(async: false) end) end def set_iam_policy(async: false) do diff --git a/lib/logflare/source/email_notification_server.ex b/lib/logflare/source/email_notification_server.ex index ea45f1a4f..5afd9be07 100644 --- a/lib/logflare/source/email_notification_server.ex +++ b/lib/logflare/source/email_notification_server.ex @@ -9,6 +9,7 @@ defmodule Logflare.Source.EmailNotificationServer do alias Logflare.AccountEmail alias Logflare.Mailer alias Logflare.Backends + alias Logflare.Utils.Tasks def start_link(args) do source = Keyword.get(args, :source) @@ -40,7 +41,7 @@ defmodule Logflare.Source.EmailNotificationServer do user = Users.Cache.get_by(id: source.user_id) if source.notifications.user_email_notifications do - Task.Supervisor.start_child(Logflare.TaskSupervisor, fn -> + Tasks.start_child(fn -> AccountEmail.source_notification(user, rate, source) |> Mailer.deliver() end) end @@ -51,7 +52,7 @@ defmodule Logflare.Source.EmailNotificationServer do other_emails = String.split(stranger_emails, ",") for email <- other_emails do - Task.Supervisor.start_child(Logflare.TaskSupervisor, fn -> + Tasks.start_child(fn -> AccountEmail.source_notification_for_others(String.trim(email), rate, source) |> Mailer.deliver() end) @@ -63,7 +64,7 @@ defmodule Logflare.Source.EmailNotificationServer do team_user = TeamUsers.Cache.get_team_user(x) if team_user do - Task.Supervisor.start_child(Logflare.TaskSupervisor, fn -> + Tasks.start_child(fn -> AccountEmail.source_notification(team_user, rate, source) |> Mailer.deliver() end) end diff --git a/lib/logflare/source/text_notification_server.ex b/lib/logflare/source/text_notification_server.ex index 64099b8de..a7002e4d8 100644 --- a/lib/logflare/source/text_notification_server.ex +++ b/lib/logflare/source/text_notification_server.ex @@ -11,6 +11,7 @@ defmodule Logflare.Source.TextNotificationServer do alias LogflareWeb.Router.Helpers, as: Routes alias LogflareWeb.Endpoint alias Logflare.Backends + alias Logflare.Utils.Tasks @twilio_phone "+16026006731" @@ -50,7 +51,7 @@ defmodule Logflare.Source.TextNotificationServer do body = "#{source.name} has #{rate} new event(s). See: #{source_link} " if source.notifications.user_text_notifications == true do - Task.Supervisor.start_child(Logflare.TaskSupervisor, fn -> + Tasks.start_child(fn -> ExTwilio.Message.create(to: user.phone, from: @twilio_phone, body: body) end) end @@ -61,7 +62,7 @@ defmodule Logflare.Source.TextNotificationServer do body = "#{source.name} has #{rate} new event(s). See: #{source_link} " if team_user do - Task.Supervisor.start_child(Logflare.TaskSupervisor, fn -> + Tasks.start_child(fn -> ExTwilio.Message.create(to: team_user.phone, from: @twilio_phone, body: body) end) end diff --git a/lib/logflare/source_schemas.ex b/lib/logflare/source_schemas.ex index 848a16576..89c655535 100644 --- a/lib/logflare/source_schemas.ex +++ b/lib/logflare/source_schemas.ex @@ -5,6 +5,7 @@ defmodule Logflare.SourceSchemas do alias Logflare.Repo alias Logflare.SourceSchemas.SourceSchema + alias Logflare.Google.BigQuery.SchemaUtils require Logger @@ -101,4 +102,61 @@ defmodule Logflare.SourceSchemas do def change_source_schema(%SourceSchema{} = source_schema, attrs \\ %{}) do SourceSchema.changeset(source_schema, attrs) end + + def format_schema(bq_schema, variant, to_merge \\ %{}) + + def format_schema(%SourceSchema{bigquery_schema: bq_schema}, :dot, to_merge) do + bq_schema + |> SchemaUtils.bq_schema_to_flat_typemap() + |> Enum.filter(fn + {_k, :map} -> false + _ -> true + end) + |> Enum.map(fn + {k, {:list, type}} -> {k, "#{type}[]"} + {k, v} -> {k, Atom.to_string(v)} + end) + |> Map.new() + |> Map.merge(to_merge) + end + + def format_schema(%SourceSchema{bigquery_schema: bq_schema}, :json_schema, to_merge) do + bq_schema + |> SchemaUtils.to_typemap() + |> typemap_to_json_schema() + |> Map.merge(to_merge) + |> Map.put( + "$schema", + "https://json-schema.org/draft/2020-12/schema" + ) + end + + defp typemap_to_json_schema(map) when is_map(map) do + properties = Enum.map(map, &typemap_to_json_schema/1) |> Map.new() + + %{ + "properties" => properties, + "type" => "object" + } + end + + defp typemap_to_json_schema({key, %{fields: fields, t: :map}}) do + {Atom.to_string(key), typemap_to_json_schema(fields)} + end + + defp typemap_to_json_schema({key, %{t: {:list, type}}}) do + {Atom.to_string(key), %{"type" => "array", "items" => %{"type" => Atom.to_string(type)}}} + end + + defp typemap_to_json_schema({key, %{t: :datetime}}), + do: {Atom.to_string(key), %{"type" => "number"}} + + defp typemap_to_json_schema({key, %{t: :integer}}), + do: {Atom.to_string(key), %{"type" => "number"}} + + defp typemap_to_json_schema({key, %{t: :float}}), + do: {Atom.to_string(key), %{"type" => "number"}} + + defp typemap_to_json_schema({key, %{t: type}}), + do: {Atom.to_string(key), %{"type" => Atom.to_string(type)}} end diff --git a/lib/logflare/utils/tasks.ex b/lib/logflare/utils/tasks.ex index 3066a4265..a2dba7435 100644 --- a/lib/logflare/utils/tasks.ex +++ b/lib/logflare/utils/tasks.ex @@ -1,6 +1,6 @@ defmodule Logflare.Utils.Tasks do @moduledoc """ - Utility functions for spawning supervised tasks with `Logflare.TaskSupervisor` + Utility functions for spawning supervised tasks with `Logflare.TaskSupervisors` https://hexdocs.pm/elixir/1.14/Task.Supervisor.html @@ -10,14 +10,22 @@ defmodule Logflare.Utils.Tasks do Linked to caller, linked to supervisor """ def async(func, opts \\ []) do - Task.Supervisor.async(Logflare.TaskSupervisor, func, opts) + Task.Supervisor.async( + {:via, PartitionSupervisor, {Logflare.TaskSupervisors, self()}}, + func, + opts + ) end @doc """ Not linked to caller, only to supervisor. """ def start_child(func, opts \\ []) do - Task.Supervisor.start_child(Logflare.TaskSupervisor, func, opts) + Task.Supervisor.start_child( + {:via, PartitionSupervisor, {Logflare.TaskSupervisors, self()}}, + func, + opts + ) end @doc """ @@ -25,8 +33,10 @@ defmodule Logflare.Utils.Tasks do Used for test teardown, to prevent ecto sandbox checkout errors. """ def kill_all_tasks do - Logflare.TaskSupervisor - |> Task.Supervisor.children() - |> Enum.map(&Task.Supervisor.terminate_child(Logflare.TaskSupervisor, &1)) + Logflare.TaskSupervisors + |> PartitionSupervisor.which_children() + |> Enum.map(fn {_, pid, _, _} -> + pid |> Task.Supervisor.children() |> Enum.map(&Task.Supervisor.terminate_child(pid, &1)) + end) end end diff --git a/lib/logflare_web/controllers/api/source_controller.ex b/lib/logflare_web/controllers/api/source_controller.ex index e5c36dbfb..f95e264b1 100644 --- a/lib/logflare_web/controllers/api/source_controller.ex +++ b/lib/logflare_web/controllers/api/source_controller.ex @@ -3,6 +3,7 @@ defmodule LogflareWeb.Api.SourceController do use OpenApiSpex.ControllerSpecs alias Logflare.Sources + alias Logflare.SourceSchemas alias Logflare.Backends alias LogflareWeb.OpenApi.Accepted alias LogflareWeb.OpenApi.Created @@ -11,6 +12,7 @@ defmodule LogflareWeb.Api.SourceController do alias LogflareWeb.OpenApiSchemas.Event alias LogflareWeb.OpenApiSchemas.Source + alias LogflareWeb.OpenApiSchemas action_fallback(LogflareWeb.Api.FallbackController) @@ -151,7 +153,7 @@ defmodule LogflareWeb.Api.SourceController do end end - operation(:removebackend, + operation(:remove_backend, summary: "Remove source backend", parameters: [ source_token: [in: :path, description: "Source Token", type: :string], @@ -173,4 +175,30 @@ defmodule LogflareWeb.Api.SourceController do |> json(source) end end + + operation(:show_schema, + summary: "Show source schema", + parameters: [token: [in: :path, description: "Source Token", type: :string]], + responses: %{ + 200 => OpenApiSchemas.SourceSchema.response(), + 404 => NotFound.response() + } + ) + + def show_schema(%{assigns: %{user: user}} = conn, %{"source_token" => token} = params) do + with source when not is_nil(source) <- Sources.get_by(token: token, user_id: user.id), + schema = SourceSchemas.get_source_schema_by(source_id: source.id) do + data = + if Map.get(params, "variant") == "dot" do + SourceSchemas.format_schema(schema, :dot) + else + SourceSchemas.format_schema(schema, :json_schema, %{ + :title => source.name, + :"$id" => ~p"/sources/#{source.token}/schema" + }) + end + + json(conn, data) + end + end end diff --git a/lib/logflare_web/open_api_schemas.ex b/lib/logflare_web/open_api_schemas.ex index 3c8a0730d..c83c76f2b 100644 --- a/lib/logflare_web/open_api_schemas.ex +++ b/lib/logflare_web/open_api_schemas.ex @@ -123,6 +123,12 @@ defmodule LogflareWeb.OpenApiSchemas do use LogflareWeb.OpenApi, properties: @properties, required: [:name] end + defmodule SourceSchema do + @properties %{} + + use LogflareWeb.OpenApi, properties: @properties, required: [] + end + defmodule RuleApiSchema do @properties %{ id: %Schema{type: :integer}, diff --git a/lib/logflare_web/router.ex b/lib/logflare_web/router.ex index ac87d4d76..77cab34db 100644 --- a/lib/logflare_web/router.ex +++ b/lib/logflare_web/router.ex @@ -391,6 +391,7 @@ defmodule LogflareWeb.Router do param: "token", only: [:index, :show, :create, :update, :delete] ) do + get "/schema", Api.SourceController, :show_schema get "/recent", Api.SourceController, :recent post "/backends/:backend_token", Api.SourceController, :add_backend delete "/backends/:backend_token", Api.SourceController, :remove_backend diff --git a/test/logflare/sources_schemas_test.exs b/test/logflare/sources_schemas_test.exs new file mode 100644 index 000000000..10e8906f9 --- /dev/null +++ b/test/logflare/sources_schemas_test.exs @@ -0,0 +1,66 @@ +defmodule Logflare.SourceSchemasTest do + @moduledoc false + use Logflare.DataCase + + alias Logflare.SourceSchemas + + describe "format_schema/3" do + setup do + insert(:plan, name: "Free") + user = insert(:user) + source = insert(:source, user: user) + %{user: user, source: source} + end + + test "dot notation with nested values", %{ + source: source + } do + schema = + insert(:source_schema, + source: source, + bigquery_schema: + TestUtils.build_bq_schema(%{ + "test" => %{"nested" => 123, "listical" => ["testing", "123"]} + }) + ) + + assert %{ + "test.nested" => "integer", + "timestamp" => "datetime", + "test.listical" => "string[]" + } = params = SourceSchemas.format_schema(schema, :dot) + + refute Map.get(params, "test") + end + + test "json schema ", %{ + source: source + } do + schema = + insert(:source_schema, + source: source, + bigquery_schema: + TestUtils.build_bq_schema(%{ + "test" => %{"nested" => 123, "listical" => ["testing", "123"]} + }) + ) + + assert %{ + "properties" => %{ + "test" => %{ + "type" => "object", + "properties" => %{ + "nested" => %{ + "type" => "number" + }, + "listical" => %{ + "type" => "array", + "items" => %{"type" => "string"} + } + } + } + } + } = SourceSchemas.format_schema(schema, :json_schema) + end + end +end diff --git a/test/logflare_web/controllers/api/source_controller_test.exs b/test/logflare_web/controllers/api/source_controller_test.exs index fe62b05ba..7616c38e7 100644 --- a/test/logflare_web/controllers/api/source_controller_test.exs +++ b/test/logflare_web/controllers/api/source_controller_test.exs @@ -197,6 +197,61 @@ defmodule LogflareWeb.Api.SourceControllerTest do end end + describe "show_schema/2" do + test "GET schema with dot syntax", %{conn: conn, user: user, sources: [source | _]} do + insert(:source_schema, + source: source, + bigquery_schema: + TestUtils.build_bq_schema(%{ + "test" => %{"nested" => 123} + }) + ) + + conn = + conn + |> add_access_token(user, "private") + |> get("/api/sources/#{source.token}/schema?variant=dot") + + # returns the source + assert %{ + "id" => "string", + "event_message" => "string", + "timestamp" => "datetime", + "test.nested" => "integer" + } = json_response(conn, 200) + end + + test "GET schema with json schema", %{conn: conn, user: user, sources: [source | _]} do + insert(:source_schema, + source: source, + bigquery_schema: + TestUtils.build_bq_schema(%{ + "test" => %{"nested" => 123, "listical" => ["testing", "123"]} + }) + ) + + %{name: source_name} = source + + conn = + conn + |> add_access_token(user, "private") + |> get("/api/sources/#{source.token}/schema") + + # returns the source + assert %{ + "$schema" => _, + "$id" => _, + "title" => ^source_name, + "type" => "object", + "properties" => %{ + "id" => %{"type" => "string"}, + "event_message" => %{"type" => "string"}, + "timestamp" => %{"type" => "number"} + } + } = json_response(conn, 200) + end + end + describe "add_backend/2" do test "attaches a backend", %{conn: conn, user: user, sources: [source | _]} do backend = insert(:backend, user: user)