From 2eb1a5b7b6d8b6a0b9426e7ee5a1fd04519dd7e2 Mon Sep 17 00:00:00 2001
From: Emelia Smith <ThisIsMissEm@users.noreply.github.com>
Date: Fri, 28 Jul 2023 12:06:29 +0200
Subject: [PATCH] Fix: Streaming server memory leak in HTTP EventSource cleanup
 (#26228)

---
 streaming/index.js | 26 +++++++++++++++++---------
 1 file changed, 17 insertions(+), 9 deletions(-)

diff --git a/streaming/index.js b/streaming/index.js
index 010dee581..1913be81f 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -226,9 +226,15 @@ const startWorker = async (workerId) => {
     callbacks.forEach(callback => callback(json));
   };
 
+  /**
+   * @callback SubscriptionListener
+   * @param {ReturnType<parseJSON>} json of the message
+   * @returns void
+   */
+
   /**
    * @param {string} channel
-   * @param {function(string): void} callback
+   * @param {SubscriptionListener} callback
    */
   const subscribe = (channel, callback) => {
     log.silly(`Adding listener for ${channel}`);
@@ -245,7 +251,7 @@ const startWorker = async (workerId) => {
 
   /**
    * @param {string} channel
-   * @param {function(Object<string, any>): void} callback
+   * @param {SubscriptionListener} callback
    */
   const unsubscribe = (channel, callback) => {
     log.silly(`Removing listener for ${channel}`);
@@ -623,9 +629,9 @@ const startWorker = async (workerId) => {
    * @param {string[]} ids
    * @param {any} req
    * @param {function(string, string): void} output
-   * @param {function(string[], function(string): void): void} attachCloseHandler
+   * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
    * @param {boolean=} needsFiltering
-   * @returns {function(object): void}
+   * @returns {SubscriptionListener}
    */
   const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
     const accountId = req.accountId || req.remoteAddress;
@@ -643,6 +649,7 @@ const startWorker = async (workerId) => {
     // The listener used to process each message off the redis subscription,
     // message here is an object with an `event` and `payload` property. Some
     // events also include a queued_at value, but this is being removed shortly.
+    /** @type {SubscriptionListener} */
     const listener = message => {
       const { event, payload } = message;
 
@@ -835,7 +842,7 @@ const startWorker = async (workerId) => {
       subscribe(`${redisPrefix}${id}`, listener);
     });
 
-    if (attachCloseHandler) {
+    if (typeof attachCloseHandler === 'function') {
       attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
     }
 
@@ -872,12 +879,13 @@ const startWorker = async (workerId) => {
   /**
    * @param {any} req
    * @param {function(): void} [closeHandler]
-   * @return {function(string[]): void}
+   * @returns {function(string[], SubscriptionListener): void}
    */
-  const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
+
+  const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
     req.on('close', () => {
       ids.forEach(id => {
-        unsubscribe(id);
+        unsubscribe(id, listener);
       });
 
       if (closeHandler) {
@@ -1137,7 +1145,7 @@ const startWorker = async (workerId) => {
    * @typedef WebSocketSession
    * @property {any} socket
    * @property {any} request
-   * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
+   * @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
    */
 
   /**