From 2f0a29aa866c75b241bef3521ac4c3b689b996e7 Mon Sep 17 00:00:00 2001 From: Thomas Citharel Date: Sat, 19 May 2018 20:29:11 +0200 Subject: [PATCH] Add some methods Signed-off-by: Thomas Citharel --- lib/service/activity_pub/activity_pub.ex | 16 ++- lib/service/activity_pub/transmogrifier.ex | 89 ++++++------- lib/service/streamer.ex | 145 +++++++++++++++++++++ 3 files changed, 205 insertions(+), 45 deletions(-) create mode 100644 lib/service/streamer.ex diff --git a/lib/service/activity_pub/activity_pub.ex b/lib/service/activity_pub/activity_pub.ex index 9d33677f9..6b6f6b74b 100644 --- a/lib/service/activity_pub/activity_pub.ex +++ b/lib/service/activity_pub/activity_pub.ex @@ -29,7 +29,7 @@ defmodule Eventos.Service.ActivityPub do } # Notification.create_notifications(activity) - #stream_out(activity) + # stream_out(activity) {:ok, activity} else %Activity{} = activity -> {:ok, activity} @@ -37,6 +37,20 @@ defmodule Eventos.Service.ActivityPub do end end +# def stream_out(%Activity{} = activity) do +# if activity.data["type"] in ["Create", "Announce"] do +# Pleroma.Web.Streamer.stream("user", activity) +# +# if Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do +# Pleroma.Web.Streamer.stream("public", activity) +# +# if activity.local do +# Pleroma.Web.Streamer.stream("public:local", activity) +# end +# end +# end +# end + def fetch_event_from_url(url) do if object = Events.get_event_by_url!(url) do {:ok, object} diff --git a/lib/service/activity_pub/transmogrifier.ex b/lib/service/activity_pub/transmogrifier.ex index 02e9ea446..3954de89a 100644 --- a/lib/service/activity_pub/transmogrifier.ex +++ b/lib/service/activity_pub/transmogrifier.ex @@ -150,18 +150,18 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do # end # end # -# def handle_incoming( -# %{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = data -# ) do -# with %User{} = actor <- User.get_or_fetch_by_ap_id(actor), -# {:ok, object} <- -# get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id), -# {:ok, activity, object} <- ActivityPub.announce(actor, object, id, false) do -# {:ok, activity} -# else -# _e -> :error -# end -# end + def handle_incoming( + %{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = data + ) do + with %Actor{} = actor <- Actors.get_or_fetch_by_url(actor), + {:ok, object} <- + get_obj_helper(object_id) || ActivityPub.fetch_event_from_url(object_id), + {:ok, activity, object} <- ActivityPub.announce(actor, object, id, false) do + {:ok, activity} + else + _e -> :error + end + end # # def handle_incoming( # %{"type" => "Update", "object" => %{"type" => "Person"} = object, "actor" => actor_id} = @@ -219,35 +219,35 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do # # Accept # # Undo # -# def handle_incoming(_), do: :error -# -# def get_obj_helper(id) do -# if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil -# end -# -# def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do -# with false <- String.starts_with?(inReplyTo, "http"), -# {:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do -# Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo) -# else -# _e -> object -# end -# end -# -# def set_reply_to_uri(obj), do: obj + def handle_incoming(_), do: :error + + def get_obj_helper(id) do + if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil + end + + def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do + with false <- String.starts_with?(inReplyTo, "http"), + {:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do + Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo) + else + _e -> object + end + end + + def set_reply_to_uri(obj), do: obj # # # Prepares the object of an outgoing create activity. -# def prepare_object(object) do -# object + def prepare_object(object) do + object # |> set_sensitive # |> add_hashtags # |> add_mention_tags # |> add_emoji_tags -# |> add_attributed_to + |> add_attributed_to # |> prepare_attachments -# |> set_conversation -# |> set_reply_to_uri -# end + |> set_conversation + |> set_reply_to_uri + end @doc """ @@ -257,7 +257,7 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do def prepare_outgoing(%{"type" => "Create", "object" => %{"type" => "Note"} = object} = data) do object = object - #|> prepare_object + |> prepare_object data = data @@ -282,6 +282,7 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do |> Map.from_struct |> Map.drop([:"__meta__"]) |> Map.put(:"@context", "https://www.w3.org/ns/activitystreams") + |> prepare_object {:ok, event} end @@ -360,21 +361,21 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do # |> Map.put("tag", tags ++ out) # end # -# def set_conversation(object) do -# Map.put(object, "conversation", object["context"]) -# end + def set_conversation(object) do + Map.put(object, "conversation", object["context"]) + end # # def set_sensitive(object) do # tags = object["tag"] || [] # Map.put(object, "sensitive", "nsfw" in tags) # end # -# def add_attributed_to(object) do -# attributedTo = object["attributedTo"] || object["actor"] -# -# object -# |> Map.put("attributedTo", attributedTo) -# end + def add_attributed_to(object) do + attributedTo = object["attributedTo"] || object["actor"] + + object + |> Map.put("attributedTo", attributedTo) + end # # def prepare_attachments(object) do # attachments = diff --git a/lib/service/streamer.ex b/lib/service/streamer.ex new file mode 100644 index 000000000..2656e36ba --- /dev/null +++ b/lib/service/streamer.ex @@ -0,0 +1,145 @@ +defmodule Eventos.Service.Streamer do + use GenServer + require Logger + alias Eventos.Accounts.Actor + + def init(args) do + {:ok, args} + end + + def start_link do + spawn(fn -> + # 30 seconds + Process.sleep(1000 * 30) + GenServer.cast(__MODULE__, %{action: :ping}) + end) + + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + def add_socket(topic, socket) do + GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) + end + + def remove_socket(topic, socket) do + GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) + end + + def stream(topic, item) do + GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) + end + + def handle_cast(%{action: :ping}, topics) do + Map.values(topics) + |> List.flatten() + |> Enum.each(fn socket -> + Logger.debug("Sending keepalive ping") + send(socket.transport_pid, {:text, ""}) + end) + + spawn(fn -> + # 30 seconds + Process.sleep(1000 * 30) + GenServer.cast(__MODULE__, %{action: :ping}) + end) + + {:noreply, topics} + end + +# def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do +# topic = "user:#{item.user_id}" +# +# Enum.each(topics[topic] || [], fn socket -> +# json = +# %{ +# event: "notification", +# payload: +# Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification( +# socket.assigns["user"], +# item +# ) +# |> Jason.encode!() +# } +# |> Jason.encode!() +# +# send(socket.transport_pid, {:text, json}) +# end) +# +# {:noreply, topics} +# end + + def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do + Logger.debug("Trying to push to users") + + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "user:#{id}" end) + + Enum.each(recipient_topics, fn topic -> + push_to_socket(topics, topic, item) + end) + + {:noreply, topics} + end + + def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do + Logger.debug("Trying to push to #{topic}") + Logger.debug("Pushing item to #{topic}") + push_to_socket(topics, topic, item) + {:noreply, topics} + end + + def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do + topic = internal_topic(topic, socket) + sockets_for_topic = sockets[topic] || [] + sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) + sockets = Map.put(sockets, topic, sockets_for_topic) + Logger.debug("Got new conn for #{topic}") + {:noreply, sockets} + end + + def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do + topic = internal_topic(topic, socket) + sockets_for_topic = sockets[topic] || [] + sockets_for_topic = List.delete(sockets_for_topic, socket) + sockets = Map.put(sockets, topic, sockets_for_topic) + Logger.debug("Removed conn for #{topic}") + {:noreply, sockets} + end + + def handle_cast(m, state) do + Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}") + {:noreply, state} + end + + def push_to_socket(topics, topic, item) do + Enum.each(topics[topic] || [], fn socket -> + # Get the current user so we have up-to-date blocks etc. + user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) + blocks = user.info["blocks"] || [] + + unless item.actor in blocks do + json = + %{ + event: "update", + payload: + Pleroma.Web.MastodonAPI.StatusView.render( + "status.json", + activity: item, + for: user + ) + |> Jason.encode!() + } + |> Jason.encode!() + + send(socket.transport_pid, {:text, json}) + end + end) + end + + defp internal_topic("user", socket) do + "user:#{socket.assigns[:user].id}" + end + + defp internal_topic(topic, _), do: topic +end