diff --git a/lib/service/activity_pub/activity_pub.ex b/lib/service/activity_pub/activity_pub.ex
index 9d33677f9..6b6f6b74b 100644
--- a/lib/service/activity_pub/activity_pub.ex
+++ b/lib/service/activity_pub/activity_pub.ex
@@ -29,7 +29,7 @@ defmodule Eventos.Service.ActivityPub do
         }
 
       # Notification.create_notifications(activity)
-      #stream_out(activity)
+      # stream_out(activity)
       {:ok, activity}
     else
       %Activity{} = activity -> {:ok, activity}
@@ -37,6 +37,20 @@ defmodule Eventos.Service.ActivityPub do
     end
   end
 
+#  def stream_out(%Activity{} = activity) do
+#    if activity.data["type"] in ["Create", "Announce"] do
+#      Pleroma.Web.Streamer.stream("user", activity)
+#
+#      if Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
+#        Pleroma.Web.Streamer.stream("public", activity)
+#
+#        if activity.local do
+#          Pleroma.Web.Streamer.stream("public:local", activity)
+#        end
+#      end
+#    end
+#  end
+
   def fetch_event_from_url(url) do
     if object = Events.get_event_by_url!(url) do
       {:ok, object}
diff --git a/lib/service/activity_pub/transmogrifier.ex b/lib/service/activity_pub/transmogrifier.ex
index 02e9ea446..3954de89a 100644
--- a/lib/service/activity_pub/transmogrifier.ex
+++ b/lib/service/activity_pub/transmogrifier.ex
@@ -150,18 +150,18 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
 #    end
 #  end
 #
-#  def handle_incoming(
-#        %{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = data
-#      ) do
-#    with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
-#         {:ok, object} <-
-#           get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
-#         {:ok, activity, object} <- ActivityPub.announce(actor, object, id, false) do
-#      {:ok, activity}
-#    else
-#      _e -> :error
-#    end
-#  end
+  def handle_incoming(
+        %{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = data
+      ) do
+    with %Actor{} = actor <- Actors.get_or_fetch_by_url(actor),
+         {:ok, object} <-
+           get_obj_helper(object_id) || ActivityPub.fetch_event_from_url(object_id),
+         {:ok, activity, object} <- ActivityPub.announce(actor, object, id, false) do
+      {:ok, activity}
+    else
+      _e -> :error
+    end
+  end
 #
 #  def handle_incoming(
 #        %{"type" => "Update", "object" => %{"type" => "Person"} = object, "actor" => actor_id} =
@@ -219,35 +219,35 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
 #  # Accept
 #  # Undo
 #
-#  def handle_incoming(_), do: :error
-#
-#  def get_obj_helper(id) do
-#    if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil
-#  end
-#
-#  def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do
-#    with false <- String.starts_with?(inReplyTo, "http"),
-#         {:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do
-#      Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo)
-#    else
-#      _e -> object
-#    end
-#  end
-#
-#  def set_reply_to_uri(obj), do: obj
+  def handle_incoming(_), do: :error
+
+  def get_obj_helper(id) do
+    if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil
+  end
+
+  def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do
+    with false <- String.starts_with?(inReplyTo, "http"),
+         {:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do
+      Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo)
+    else
+      _e -> object
+    end
+  end
+
+  def set_reply_to_uri(obj), do: obj
 #
 #  # Prepares the object of an outgoing create activity.
-#  def prepare_object(object) do
-#    object
+  def prepare_object(object) do
+    object
 #    |> set_sensitive
 #    |> add_hashtags
 #    |> add_mention_tags
 #    |> add_emoji_tags
-#    |> add_attributed_to
+    |> add_attributed_to
 #    |> prepare_attachments
-#    |> set_conversation
-#    |> set_reply_to_uri
-#  end
+    |> set_conversation
+    |> set_reply_to_uri
+  end
 
   @doc
   """
@@ -257,7 +257,7 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
   def prepare_outgoing(%{"type" => "Create", "object" => %{"type" => "Note"} = object} = data) do
     object =
       object
-      #|> prepare_object
+      |> prepare_object
 
     data =
       data
@@ -282,6 +282,7 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
       |> Map.from_struct
       |> Map.drop([:"__meta__"])
       |> Map.put(:"@context", "https://www.w3.org/ns/activitystreams")
+      |> prepare_object
 
     {:ok, event}
   end
@@ -360,21 +361,21 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
 #    |> Map.put("tag", tags ++ out)
 #  end
 #
-#  def set_conversation(object) do
-#    Map.put(object, "conversation", object["context"])
-#  end
+  def set_conversation(object) do
+    Map.put(object, "conversation", object["context"])
+  end
 #
 #  def set_sensitive(object) do
 #    tags = object["tag"] || []
 #    Map.put(object, "sensitive", "nsfw" in tags)
 #  end
 #
-#  def add_attributed_to(object) do
-#    attributedTo = object["attributedTo"] || object["actor"]
-#
-#    object
-#    |> Map.put("attributedTo", attributedTo)
-#  end
+  def add_attributed_to(object) do
+    attributedTo = object["attributedTo"] || object["actor"]
+
+    object
+    |> Map.put("attributedTo", attributedTo)
+  end
 #
 #  def prepare_attachments(object) do
 #    attachments =
diff --git a/lib/service/streamer.ex b/lib/service/streamer.ex
new file mode 100644
index 000000000..2656e36ba
--- /dev/null
+++ b/lib/service/streamer.ex
@@ -0,0 +1,145 @@
+defmodule Eventos.Service.Streamer do
+  use GenServer
+  require Logger
+  alias Eventos.Accounts.Actor
+
+  def init(args) do
+    {:ok, args}
+  end
+
+  def start_link do
+    spawn(fn ->
+      # 30 seconds
+      Process.sleep(1000 * 30)
+      GenServer.cast(__MODULE__, %{action: :ping})
+    end)
+
+    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
+  end
+
+  def add_socket(topic, socket) do
+    GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
+  end
+
+  def remove_socket(topic, socket) do
+    GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
+  end
+
+  def stream(topic, item) do
+    GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
+  end
+
+  def handle_cast(%{action: :ping}, topics) do
+    Map.values(topics)
+    |> List.flatten()
+    |> Enum.each(fn socket ->
+      Logger.debug("Sending keepalive ping")
+      send(socket.transport_pid, {:text, ""})
+    end)
+
+    spawn(fn ->
+      # 30 seconds
+      Process.sleep(1000 * 30)
+      GenServer.cast(__MODULE__, %{action: :ping})
+    end)
+
+    {:noreply, topics}
+  end
+
+#  def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
+#    topic = "user:#{item.user_id}"
+#
+#    Enum.each(topics[topic] || [], fn socket ->
+#      json =
+#        %{
+#          event: "notification",
+#          payload:
+#            Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
+#              socket.assigns["user"],
+#              item
+#            )
+#            |> Jason.encode!()
+#        }
+#        |> Jason.encode!()
+#
+#      send(socket.transport_pid, {:text, json})
+#    end)
+#
+#    {:noreply, topics}
+#  end
+
+  def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
+    Logger.debug("Trying to push to users")
+
+    recipient_topics =
+      User.get_recipients_from_activity(item)
+      |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+
+    Enum.each(recipient_topics, fn topic ->
+      push_to_socket(topics, topic, item)
+    end)
+
+    {:noreply, topics}
+  end
+
+  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+    Logger.debug("Trying to push to #{topic}")
+    Logger.debug("Pushing item to #{topic}")
+    push_to_socket(topics, topic, item)
+    {:noreply, topics}
+  end
+
+  def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
+    topic = internal_topic(topic, socket)
+    sockets_for_topic = sockets[topic] || []
+    sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
+    sockets = Map.put(sockets, topic, sockets_for_topic)
+    Logger.debug("Got new conn for #{topic}")
+    {:noreply, sockets}
+  end
+
+  def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
+    topic = internal_topic(topic, socket)
+    sockets_for_topic = sockets[topic] || []
+    sockets_for_topic = List.delete(sockets_for_topic, socket)
+    sockets = Map.put(sockets, topic, sockets_for_topic)
+    Logger.debug("Removed conn for #{topic}")
+    {:noreply, sockets}
+  end
+
+  def handle_cast(m, state) do
+    Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
+    {:noreply, state}
+  end
+
+  def push_to_socket(topics, topic, item) do
+    Enum.each(topics[topic] || [], fn socket ->
+      # Get the current user so we have up-to-date blocks etc.
+      user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
+      blocks = user.info["blocks"] || []
+
+      unless item.actor in blocks do
+        json =
+          %{
+            event: "update",
+            payload:
+              Pleroma.Web.MastodonAPI.StatusView.render(
+                "status.json",
+                activity: item,
+                for: user
+              )
+              |> Jason.encode!()
+          }
+          |> Jason.encode!()
+
+        send(socket.transport_pid, {:text, json})
+      end
+    end)
+  end
+
+  defp internal_topic("user", socket) do
+    "user:#{socket.assigns[:user].id}"
+  end
+
+  defp internal_topic(topic, _), do: topic
+end