gtask: ensure task is destroyed on its context's thread

The GTask must be destroyed on the thread that is running its
GMainContext, i.e. the thread that started the task. It must never be
destroyed on the actual task thread when running with
g_task_run_in_thread(), because when it is destroyed, it will unref its
source object and destroy its user data (if a GDestroyNotify was set for
the data using g_task_set_task_data()). The source object and task data
might not be safe to destroy on a secondary thread, though, so this is
incorrect. We have to ensure they are destroyed on the task's context's
thread.

There are different ways we could do this, but the simplest by far is to
ensure the task thread has unreffed the task before the context's thread
executes the callback. And that is simple enough to do using a condition
variable. We have to keep a static global map of all GTasks with
outstanding task threads, which is slightly unfortunate, but we already
have a bunch of global data in this file for managing the thread pool,
and the map will only contain tasks that are currently running in
threads, so it should be small.

Fixes #1346
diff --git a/gio/gtask.c b/gio/gtask.c
index 9950bb3..5502d56 100644
--- a/gio/gtask.c
+++ b/gio/gtask.c
@@ -558,7 +558,6 @@
 
   GTaskThreadFunc task_func;
   GMutex lock;
-  GCond cond;
 
   /* This can’t be in the bit field because we access it from TRACE(). */
   gboolean thread_cancelled;
@@ -616,6 +615,46 @@
 static guint64 task_wait_time;
 static gint tasks_running;
 
+static GHashTable *completion_synchronizer_table; /* GTask -> ThreadedTaskCompletionSynchronizer */
+static GMutex completion_synchronizer_table_mutex;
+
+typedef struct {
+  GMutex mutex;
+  GCond condition;
+  gboolean finished;
+  gatomicrefcount refcount;
+} ThreadedTaskCompletionSynchronizer;
+
+static ThreadedTaskCompletionSynchronizer *
+threaded_task_completion_synchronizer_new (void)
+{
+  ThreadedTaskCompletionSynchronizer *synchronizer = g_new (ThreadedTaskCompletionSynchronizer, 1);
+
+  g_mutex_init (&synchronizer->mutex);
+  g_cond_init (&synchronizer->condition);
+  synchronizer->finished = FALSE;
+  g_atomic_ref_count_init (&synchronizer->refcount);
+
+  return synchronizer;
+}
+
+static void
+threaded_task_completion_synchronizer_ref (ThreadedTaskCompletionSynchronizer *synchronizer)
+{
+  g_atomic_ref_count_inc (&synchronizer->refcount);
+}
+
+static void
+threaded_task_completion_synchronizer_unref (ThreadedTaskCompletionSynchronizer *synchronizer)
+{
+  if (g_atomic_ref_count_dec (&synchronizer->refcount))
+    {
+      g_mutex_clear (&synchronizer->mutex);
+      g_cond_clear (&synchronizer->condition);
+      g_free (synchronizer);
+    }
+}
+
 /* When the task pool fills up and blocks, and the program keeps
  * queueing more tasks, we will slowly add more threads to the pool
  * (in case the existing tasks are trying to queue subtasks of their
@@ -660,13 +699,10 @@
     task->result_destroy (task->result.pointer);
 
   if (task->error)
-      g_error_free (task->error);
+    g_error_free (task->error);
 
   if (G_TASK_IS_THREADED (task))
-    {
-      g_mutex_clear (&task->lock);
-      g_cond_clear (&task->cond);
-    }
+    g_mutex_clear (&task->lock);
 
   G_OBJECT_CLASS (g_task_parent_class)->finalize (object);
 }
@@ -1220,9 +1256,38 @@
   g_main_context_pop_thread_default (task->context);
 }
 
-static gboolean
-complete_in_idle_cb (gpointer task)
+static void
+wait_for_threaded_task_completion (GTask *task)
 {
+  ThreadedTaskCompletionSynchronizer *synchronizer;
+
+  /* Here we ensure that the task has already been unreffed on the task
+   * thread, to ensure we never destroy the task's source object or user
+   * data on the task thread.
+   */
+  g_mutex_lock (&completion_synchronizer_table_mutex);
+  synchronizer = g_hash_table_lookup (completion_synchronizer_table, task);
+  g_mutex_unlock (&completion_synchronizer_table_mutex);
+
+  g_assert (synchronizer != NULL);
+  g_mutex_lock (&synchronizer->mutex);
+  while (!synchronizer->finished)
+    g_cond_wait (&synchronizer->condition, &synchronizer->mutex);
+  g_mutex_unlock (&synchronizer->mutex);
+
+  g_mutex_lock (&completion_synchronizer_table_mutex);
+  g_hash_table_remove (completion_synchronizer_table, task);
+  g_mutex_unlock (&completion_synchronizer_table_mutex);
+}
+
+static gboolean
+complete_in_idle_cb (gpointer user_data)
+{
+  GTask *task = user_data;
+
+  if (G_TASK_IS_THREADED (task))
+    wait_for_threaded_task_completion (task);
+
   g_task_return_now (task);
   g_object_unref (task);
   return FALSE;
@@ -1343,9 +1408,7 @@
   if (task->cancellable)
     g_signal_handlers_disconnect_by_func (task->cancellable, task_thread_cancelled, task);
 
-  if (task->synchronous)
-    g_cond_signal (&task->cond);
-  else
+  if (!task->synchronous)
     g_task_return (task, G_TASK_RETURN_FROM_THREAD);
 }
 
@@ -1404,6 +1467,7 @@
                            gpointer pool_data)
 {
   GTask *task = thread_data;
+  ThreadedTaskCompletionSynchronizer *synchronizer;
 
   g_task_thread_setup ();
 
@@ -1412,6 +1476,26 @@
   g_task_thread_complete (task);
   g_object_unref (task);
 
+  /* Now we have to ensure that the task is not destroyed in this thread, since
+   * that would cause its source object and user data to be destroyed, and that
+   * needs to happen back in the original thread. So signal to the original
+   * thread that we are done. Note we cannot use task at this point except for
+   * its address.
+   */
+  g_mutex_lock (&completion_synchronizer_table_mutex);
+  synchronizer = g_hash_table_lookup (completion_synchronizer_table, task);
+  g_mutex_unlock (&completion_synchronizer_table_mutex);
+
+  g_assert (synchronizer != NULL);
+  threaded_task_completion_synchronizer_ref (synchronizer);
+
+  g_mutex_lock (&synchronizer->mutex);
+  synchronizer->finished = TRUE;
+  g_cond_signal (&synchronizer->condition);
+  g_mutex_unlock (&synchronizer->mutex);
+
+  threaded_task_completion_synchronizer_unref (synchronizer);
+
   g_task_thread_cleanup ();
 }
 
@@ -1454,8 +1538,19 @@
 g_task_start_task_thread (GTask           *task,
                           GTaskThreadFunc  task_func)
 {
+  g_mutex_lock (&completion_synchronizer_table_mutex);
+  if (g_hash_table_contains (completion_synchronizer_table, task))
+    {
+      g_critical ("Task is already running in thread");
+      g_mutex_unlock (&completion_synchronizer_table_mutex);
+      return;
+    }
+
+  g_hash_table_insert (completion_synchronizer_table, task,
+                       threaded_task_completion_synchronizer_new ());
+  g_mutex_unlock (&completion_synchronizer_table_mutex);
+
   g_mutex_init (&task->lock);
-  g_cond_init (&task->cond);
 
   g_mutex_lock (&task->lock);
 
@@ -1571,12 +1666,10 @@
 
   task->synchronous = TRUE;
   g_task_start_task_thread (task, task_func);
-
-  while (!task->thread_complete)
-    g_cond_wait (&task->cond, &task->lock);
-
   g_mutex_unlock (&task->lock);
 
+  wait_for_threaded_task_completion (task);
+
   TRACE (GIO_TASK_BEFORE_RETURN (task, task->source_object,
                                  NULL  /* callback */,
                                  NULL  /* callback data */));
@@ -2067,6 +2160,10 @@
   g_source_attach (task_pool_manager,
                    GLIB_PRIVATE_CALL (g_get_worker_context ()));
   g_source_unref (task_pool_manager);
+
+  completion_synchronizer_table = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+                                                         NULL,
+                                                         (GDestroyNotify)threaded_task_completion_synchronizer_unref);
 }
 
 static void
diff --git a/gio/tests/task.c b/gio/tests/task.c
index 9b2c691..284cbb2 100644
--- a/gio/tests/task.c
+++ b/gio/tests/task.c
@@ -1408,6 +1408,114 @@
   g_assert_cmpint (i + strspn (buf + i, "X"), ==, NUM_OVERFLOW_TASKS);
 }
 
+/* test_run_in_thread_destruction: source object and user data must be
+ * destroyed on the original thread, not the task thread
+ */
+
+#define SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER (source_object_destruction_thread_checker_get_type())
+
+G_DECLARE_FINAL_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker, SOURCE_OBJECT, DESTRUCTION_THREAD_CHECKER, GObject)
+
+struct _SourceObjectDestructionThreadChecker
+{
+  GObject parent_instance;
+};
+
+G_DEFINE_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker, G_TYPE_OBJECT)
+
+static void
+source_object_destruction_thread_checker_finalize (GObject *object)
+{
+  g_assert_true (main_thread == g_thread_self ());
+
+  G_OBJECT_CLASS (source_object_destruction_thread_checker_parent_class)->finalize (object);
+}
+
+static void
+source_object_destruction_thread_checker_class_init (SourceObjectDestructionThreadCheckerClass *klass)
+{
+  GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+  object_class->finalize = source_object_destruction_thread_checker_finalize;
+}
+
+static void
+source_object_destruction_thread_checker_init (SourceObjectDestructionThreadChecker *self)
+{
+}
+
+static void
+user_data_destruction_thread_checker (gpointer user_data)
+{
+  g_assert_true (main_thread == g_thread_self ());
+}
+
+static void
+destruction_thread_test_cb (GObject      *source_object,
+                            GAsyncResult *res,
+                            gpointer      user_data)
+{
+  g_assert_true (main_thread == g_thread_self ());
+  g_main_loop_quit ((GMainLoop *)user_data);
+}
+
+static void
+destruction_thread_test_thread (GTask        *task,
+                                gpointer      source_object,
+                                gpointer      task_data,
+                                GCancellable *cancellable)
+{
+  g_assert_false (main_thread == g_thread_self ());
+  g_object_unref (source_object);
+}
+
+static void
+test_run_in_thread_destruction (void)
+{
+  SourceObjectDestructionThreadChecker *source_object;
+  GMainLoop *main_loop;
+  GTask *task;
+  int i;
+
+  g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1346");
+
+  main_loop = g_main_loop_new (NULL, FALSE);
+
+  for (i = 0; i < 1000000; i++)
+    {
+      source_object = g_object_new (SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER, NULL);
+      task = g_task_new (source_object, NULL, destruction_thread_test_cb, main_loop);
+      g_task_set_task_data (task, NULL, user_data_destruction_thread_checker);
+      g_task_run_in_thread (task, destruction_thread_test_thread);
+      g_main_loop_run (main_loop);
+      g_object_unref (task);
+    }
+
+  g_main_loop_unref (main_loop);
+}
+
+/* test_run_in_thread_sync_destruction: source object and user data
+ * must be destroyed on the original thread, not the task thread
+ */
+static void
+test_run_in_thread_sync_destruction (void)
+{
+  SourceObjectDestructionThreadChecker *source_object;
+  GTask *task;
+  int i;
+
+  g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1346");
+
+  for (i = 0; i < 1000000; i++)
+    {
+      source_object = g_object_new (SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER, NULL);
+      task = g_task_new (source_object, NULL, NULL, NULL);
+      g_task_set_task_data (task, NULL, user_data_destruction_thread_checker);
+      g_task_run_in_thread_sync (task, destruction_thread_test_thread);
+      g_object_unref (task);
+    }
+}
+
 /* test_return_on_cancel */
 
 GMutex roc_init_mutex, roc_finish_mutex;
@@ -2345,6 +2453,8 @@
   g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
   g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
   g_test_add_func ("/gtask/run-in-thread-overflow", test_run_in_thread_overflow);
+  g_test_add_func ("/gtask/run-in-thread-destruction", test_run_in_thread_destruction);
+  g_test_add_func ("/gtask/run-in-thread-sync-destruction", test_run_in_thread_sync_destruction);
   g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
   g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
   g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);