Add some methods
Signed-off-by: Thomas Citharel <tcit@tcit.fr>
This commit is contained in:
parent
e47ff97ac6
commit
2f0a29aa86
|
@ -29,7 +29,7 @@ defmodule Eventos.Service.ActivityPub do
|
||||||
}
|
}
|
||||||
|
|
||||||
# Notification.create_notifications(activity)
|
# Notification.create_notifications(activity)
|
||||||
#stream_out(activity)
|
# stream_out(activity)
|
||||||
{:ok, activity}
|
{:ok, activity}
|
||||||
else
|
else
|
||||||
%Activity{} = activity -> {:ok, activity}
|
%Activity{} = activity -> {:ok, activity}
|
||||||
|
@ -37,6 +37,20 @@ defmodule Eventos.Service.ActivityPub do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# def stream_out(%Activity{} = activity) do
|
||||||
|
# if activity.data["type"] in ["Create", "Announce"] do
|
||||||
|
# Pleroma.Web.Streamer.stream("user", activity)
|
||||||
|
#
|
||||||
|
# if Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
|
||||||
|
# Pleroma.Web.Streamer.stream("public", activity)
|
||||||
|
#
|
||||||
|
# if activity.local do
|
||||||
|
# Pleroma.Web.Streamer.stream("public:local", activity)
|
||||||
|
# end
|
||||||
|
# end
|
||||||
|
# end
|
||||||
|
# end
|
||||||
|
|
||||||
def fetch_event_from_url(url) do
|
def fetch_event_from_url(url) do
|
||||||
if object = Events.get_event_by_url!(url) do
|
if object = Events.get_event_by_url!(url) do
|
||||||
{:ok, object}
|
{:ok, object}
|
||||||
|
|
|
@ -150,18 +150,18 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
|
||||||
# end
|
# end
|
||||||
# end
|
# end
|
||||||
#
|
#
|
||||||
# def handle_incoming(
|
def handle_incoming(
|
||||||
# %{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = data
|
%{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = data
|
||||||
# ) do
|
) do
|
||||||
# with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
|
with %Actor{} = actor <- Actors.get_or_fetch_by_url(actor),
|
||||||
# {:ok, object} <-
|
{:ok, object} <-
|
||||||
# get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
|
get_obj_helper(object_id) || ActivityPub.fetch_event_from_url(object_id),
|
||||||
# {:ok, activity, object} <- ActivityPub.announce(actor, object, id, false) do
|
{:ok, activity, object} <- ActivityPub.announce(actor, object, id, false) do
|
||||||
# {:ok, activity}
|
{:ok, activity}
|
||||||
# else
|
else
|
||||||
# _e -> :error
|
_e -> :error
|
||||||
# end
|
end
|
||||||
# end
|
end
|
||||||
#
|
#
|
||||||
# def handle_incoming(
|
# def handle_incoming(
|
||||||
# %{"type" => "Update", "object" => %{"type" => "Person"} = object, "actor" => actor_id} =
|
# %{"type" => "Update", "object" => %{"type" => "Person"} = object, "actor" => actor_id} =
|
||||||
|
@ -219,35 +219,35 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
|
||||||
# # Accept
|
# # Accept
|
||||||
# # Undo
|
# # Undo
|
||||||
#
|
#
|
||||||
# def handle_incoming(_), do: :error
|
def handle_incoming(_), do: :error
|
||||||
#
|
|
||||||
# def get_obj_helper(id) do
|
def get_obj_helper(id) do
|
||||||
# if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil
|
if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil
|
||||||
# end
|
end
|
||||||
#
|
|
||||||
# def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do
|
def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do
|
||||||
# with false <- String.starts_with?(inReplyTo, "http"),
|
with false <- String.starts_with?(inReplyTo, "http"),
|
||||||
# {:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do
|
{:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do
|
||||||
# Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo)
|
Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo)
|
||||||
# else
|
else
|
||||||
# _e -> object
|
_e -> object
|
||||||
# end
|
end
|
||||||
# end
|
end
|
||||||
#
|
|
||||||
# def set_reply_to_uri(obj), do: obj
|
def set_reply_to_uri(obj), do: obj
|
||||||
#
|
#
|
||||||
# # Prepares the object of an outgoing create activity.
|
# # Prepares the object of an outgoing create activity.
|
||||||
# def prepare_object(object) do
|
def prepare_object(object) do
|
||||||
# object
|
object
|
||||||
# |> set_sensitive
|
# |> set_sensitive
|
||||||
# |> add_hashtags
|
# |> add_hashtags
|
||||||
# |> add_mention_tags
|
# |> add_mention_tags
|
||||||
# |> add_emoji_tags
|
# |> add_emoji_tags
|
||||||
# |> add_attributed_to
|
|> add_attributed_to
|
||||||
# |> prepare_attachments
|
# |> prepare_attachments
|
||||||
# |> set_conversation
|
|> set_conversation
|
||||||
# |> set_reply_to_uri
|
|> set_reply_to_uri
|
||||||
# end
|
end
|
||||||
|
|
||||||
@doc
|
@doc
|
||||||
"""
|
"""
|
||||||
|
@ -257,7 +257,7 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
|
||||||
def prepare_outgoing(%{"type" => "Create", "object" => %{"type" => "Note"} = object} = data) do
|
def prepare_outgoing(%{"type" => "Create", "object" => %{"type" => "Note"} = object} = data) do
|
||||||
object =
|
object =
|
||||||
object
|
object
|
||||||
#|> prepare_object
|
|> prepare_object
|
||||||
|
|
||||||
data =
|
data =
|
||||||
data
|
data
|
||||||
|
@ -282,6 +282,7 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
|
||||||
|> Map.from_struct
|
|> Map.from_struct
|
||||||
|> Map.drop([:"__meta__"])
|
|> Map.drop([:"__meta__"])
|
||||||
|> Map.put(:"@context", "https://www.w3.org/ns/activitystreams")
|
|> Map.put(:"@context", "https://www.w3.org/ns/activitystreams")
|
||||||
|
|> prepare_object
|
||||||
|
|
||||||
{:ok, event}
|
{:ok, event}
|
||||||
end
|
end
|
||||||
|
@ -360,21 +361,21 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
|
||||||
# |> Map.put("tag", tags ++ out)
|
# |> Map.put("tag", tags ++ out)
|
||||||
# end
|
# end
|
||||||
#
|
#
|
||||||
# def set_conversation(object) do
|
def set_conversation(object) do
|
||||||
# Map.put(object, "conversation", object["context"])
|
Map.put(object, "conversation", object["context"])
|
||||||
# end
|
end
|
||||||
#
|
#
|
||||||
# def set_sensitive(object) do
|
# def set_sensitive(object) do
|
||||||
# tags = object["tag"] || []
|
# tags = object["tag"] || []
|
||||||
# Map.put(object, "sensitive", "nsfw" in tags)
|
# Map.put(object, "sensitive", "nsfw" in tags)
|
||||||
# end
|
# end
|
||||||
#
|
#
|
||||||
# def add_attributed_to(object) do
|
def add_attributed_to(object) do
|
||||||
# attributedTo = object["attributedTo"] || object["actor"]
|
attributedTo = object["attributedTo"] || object["actor"]
|
||||||
#
|
|
||||||
# object
|
object
|
||||||
# |> Map.put("attributedTo", attributedTo)
|
|> Map.put("attributedTo", attributedTo)
|
||||||
# end
|
end
|
||||||
#
|
#
|
||||||
# def prepare_attachments(object) do
|
# def prepare_attachments(object) do
|
||||||
# attachments =
|
# attachments =
|
||||||
|
|
145
lib/service/streamer.ex
Normal file
145
lib/service/streamer.ex
Normal file
|
@ -0,0 +1,145 @@
|
||||||
|
defmodule Eventos.Service.Streamer do
|
||||||
|
use GenServer
|
||||||
|
require Logger
|
||||||
|
alias Eventos.Accounts.Actor
|
||||||
|
|
||||||
|
def init(args) do
|
||||||
|
{:ok, args}
|
||||||
|
end
|
||||||
|
|
||||||
|
def start_link do
|
||||||
|
spawn(fn ->
|
||||||
|
# 30 seconds
|
||||||
|
Process.sleep(1000 * 30)
|
||||||
|
GenServer.cast(__MODULE__, %{action: :ping})
|
||||||
|
end)
|
||||||
|
|
||||||
|
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
||||||
|
end
|
||||||
|
|
||||||
|
def add_socket(topic, socket) do
|
||||||
|
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
|
||||||
|
end
|
||||||
|
|
||||||
|
def remove_socket(topic, socket) do
|
||||||
|
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
|
||||||
|
end
|
||||||
|
|
||||||
|
def stream(topic, item) do
|
||||||
|
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_cast(%{action: :ping}, topics) do
|
||||||
|
Map.values(topics)
|
||||||
|
|> List.flatten()
|
||||||
|
|> Enum.each(fn socket ->
|
||||||
|
Logger.debug("Sending keepalive ping")
|
||||||
|
send(socket.transport_pid, {:text, ""})
|
||||||
|
end)
|
||||||
|
|
||||||
|
spawn(fn ->
|
||||||
|
# 30 seconds
|
||||||
|
Process.sleep(1000 * 30)
|
||||||
|
GenServer.cast(__MODULE__, %{action: :ping})
|
||||||
|
end)
|
||||||
|
|
||||||
|
{:noreply, topics}
|
||||||
|
end
|
||||||
|
|
||||||
|
# def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
|
||||||
|
# topic = "user:#{item.user_id}"
|
||||||
|
#
|
||||||
|
# Enum.each(topics[topic] || [], fn socket ->
|
||||||
|
# json =
|
||||||
|
# %{
|
||||||
|
# event: "notification",
|
||||||
|
# payload:
|
||||||
|
# Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
|
||||||
|
# socket.assigns["user"],
|
||||||
|
# item
|
||||||
|
# )
|
||||||
|
# |> Jason.encode!()
|
||||||
|
# }
|
||||||
|
# |> Jason.encode!()
|
||||||
|
#
|
||||||
|
# send(socket.transport_pid, {:text, json})
|
||||||
|
# end)
|
||||||
|
#
|
||||||
|
# {:noreply, topics}
|
||||||
|
# end
|
||||||
|
|
||||||
|
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
|
||||||
|
Logger.debug("Trying to push to users")
|
||||||
|
|
||||||
|
recipient_topics =
|
||||||
|
User.get_recipients_from_activity(item)
|
||||||
|
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
|
||||||
|
|
||||||
|
Enum.each(recipient_topics, fn topic ->
|
||||||
|
push_to_socket(topics, topic, item)
|
||||||
|
end)
|
||||||
|
|
||||||
|
{:noreply, topics}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
|
||||||
|
Logger.debug("Trying to push to #{topic}")
|
||||||
|
Logger.debug("Pushing item to #{topic}")
|
||||||
|
push_to_socket(topics, topic, item)
|
||||||
|
{:noreply, topics}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
|
||||||
|
topic = internal_topic(topic, socket)
|
||||||
|
sockets_for_topic = sockets[topic] || []
|
||||||
|
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
|
||||||
|
sockets = Map.put(sockets, topic, sockets_for_topic)
|
||||||
|
Logger.debug("Got new conn for #{topic}")
|
||||||
|
{:noreply, sockets}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
|
||||||
|
topic = internal_topic(topic, socket)
|
||||||
|
sockets_for_topic = sockets[topic] || []
|
||||||
|
sockets_for_topic = List.delete(sockets_for_topic, socket)
|
||||||
|
sockets = Map.put(sockets, topic, sockets_for_topic)
|
||||||
|
Logger.debug("Removed conn for #{topic}")
|
||||||
|
{:noreply, sockets}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_cast(m, state) do
|
||||||
|
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def push_to_socket(topics, topic, item) do
|
||||||
|
Enum.each(topics[topic] || [], fn socket ->
|
||||||
|
# Get the current user so we have up-to-date blocks etc.
|
||||||
|
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
|
||||||
|
blocks = user.info["blocks"] || []
|
||||||
|
|
||||||
|
unless item.actor in blocks do
|
||||||
|
json =
|
||||||
|
%{
|
||||||
|
event: "update",
|
||||||
|
payload:
|
||||||
|
Pleroma.Web.MastodonAPI.StatusView.render(
|
||||||
|
"status.json",
|
||||||
|
activity: item,
|
||||||
|
for: user
|
||||||
|
)
|
||||||
|
|> Jason.encode!()
|
||||||
|
}
|
||||||
|
|> Jason.encode!()
|
||||||
|
|
||||||
|
send(socket.transport_pid, {:text, json})
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp internal_topic("user", socket) do
|
||||||
|
"user:#{socket.assigns[:user].id}"
|
||||||
|
end
|
||||||
|
|
||||||
|
defp internal_topic(topic, _), do: topic
|
||||||
|
end
|
Loading…
Reference in a new issue