Java Thread. Прерываем ассинхронный поток правильно

версия для печати
Thread lifecycle

Постараюсь не разводить воду. Есть такая задача (реальная, но для статьи все совпадения вымышленные )): юзер через админку в нашу API запрашивает обновление статистики рассылки. Это сложная процедура, много собирать, взаимодействовать с чужой API, которая нестабильно работает. Короче - это время. Поэтому мы юзеру в браузер сразу отвечаем, что запрос приняли, а выполнять его будем в фоне. Когда закончим, пришлем уведомление в Slack.

Оставляем юзеру возможность отменить сбор статы. Т.е. это другой API роут, куда он обратится, чтобы остановить задачу.

А теперь - реализация в реалиях Spring Boot 2.7.16 + Kotlin. Версия Java под капотом - openJDK 11. Да, мы отстали от жизни. Но забегая вперед, в 17й такая же дичь творится.

Не буду расписывать то, что в интернетах хорошо объяснено. Я въехал в пня именно с обработкой прерывания. А когда раскопал с исходниках, как это все таки работает - афигел от невероятности увиденного. В плохом смысле слова.

Чтобы включить поддержку ассинхронности в Spring, почти ничего не надо. Только конфиг, который магически подхватывается при запуске приложения. Например, его можно оформть в отдельном классе (так у нас принято):

package io.mediacloud.sendpulsestats.config

import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableAsync

@Configuration
@EnableAsync      // <----- Нужное заклинание
class AsyncConfig

Извините за унылую подсветку кода. Некогда этим заниматься.

Теперь опишем контроллер:

package io.mediacloud.sendpulsestats.rest

import io.mediacloud.sendpulsestats.rest.dto.UpdateStatsRequest
import io.mediacloud.sendpulsestats.service.StatsHarvester
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.validation.annotation.Validated
import org.springframework.web.bind.annotation.*
import java.util.concurrent.Future
import javax.validation.Valid

@RequestMapping("/api/admin/v1/stats")
@RestController
@Validated
class StatsController(val harvester: StatsHarvester)
{
    var updateThreads: MutableMap<Long, Future<String>> = mutableMapOf()

    @PostMapping("/update", produces = [MediaType.APPLICATION_JSON_VALUE])
    @ResponseStatus(HttpStatus.ACCEPTED)
    fun updateStats(@Valid @RequestBody request: UpdateStatsRequest): String {
        // Некоторые проверки пропущены.
        updateThreads[request.responderExtId] = harvester.asyncUpdateResponderStats(request)
        return """{
            "status": 202,
            "message": "Statistics are now updated in the background. Please wait for the notification in Slack."
            }"""
    }

    @PostMapping("/update/{responderExtId}/stop")
    @ResponseStatus(HttpStatus.ACCEPTED)
    fun stopUpdateStats(@PathVariable responderExtId: Long) {
        updateThreads[responderExtId]?.cancel(true)
        updateThreads.remove(responderExtId)
    }
}

Два роута. Ассинхронный вызов метода сразу возвращает результат, по которому мы потом будем тушить фоновый поток. Т.к. юзер может запросить сбор статы по нескольким рассылкам сразу, нужно хранить привязку на каждый соответствущий поток отдельно, поэтому MutableMap.

Неудобство #1: мы не получаем ссылку на поток, только какую-то обертку в виде будущего результата. Почему неудобно? Потому что потушить поток напрямую не сможем, метод Future.cancel(boolean mayInterruptIfRunning) выглядит, как костыли. Особенно с возможностью не прерывать выполнение потока этим методом отмены. А нахрена, спрашивается, такая логика?!

Ну а теперь "вся остальная гребанная сова (с)":

package io.mediacloud.sendpulsestats.service

import io.mediacloud.sendpulsestats.rest.dto.UpdateStatsRequest
import mu.KotlinLogging
import org.springframework.scheduling.annotation.Async
import org.springframework.scheduling.annotation.AsyncResult
import org.springframework.stereotype.Service
import java.util.concurrent.Future

@Service
class StatsHarvester()
{
    private val logger = KotlinLogging.logger {}

    @Async
    fun asyncUpdateResponderStats(request: UpdateStatsRequest): Future<String> {
        logger.debug("Async test starting.")
        for (i in 1..20) {
            Thread.sleep(1000)
            logger.debug("Async test. Tick for id=${request.responderExtId}.")
            if (Thread.currentThread().isInterrupted()) {
                logger.debug("Async test interrupted.")
                break
            }
        }
        logger.debug("Async test finished.")
        sendNotification(responder)
        return AsyncResult("whatever")
    }
}

Аннотация @Async отправит метод выполняться в отдельном потоке. Черная магия в действии.

Цикл имитирует долгую работу в потоке. Пауза не просто для имитации, в реальном коде коде она тоже есть. Там она нужна, чтобы дать возможность чужой API подняться перед повтором запроса к ней. В общем, этот код близок к реальному решению.

Очередной WTF. Смотрим возвращаемое значение и то, что заявлено после сигнатуры метода. Future - это интерфейс, который реализуется, в частности, классом AsyncResult. Но если явно заявить, что возвращаем AsyncResult - получится ошибка. Не знаю, почему, в этом месте я не полез в исходники.

Кстати, нам этот AsyncResult нужен только для того, чтобы поток остановить. Больше нигде не используется, поэтому возвращаемое значение нам неважно ("whatever").

Представленный код не будет работать, как ожидается. Все потому, что Thread.sleep() кидает исключение InterruptedException, когда получает команду прервать выполнение паузы. Если найти в исходниках реализацию Thread.sleep(), там ничего нет. Это native-функция, она вызывается из библиотеки на другом языке. В 17 версии расширили возможности метода, суть не изменилась. Короче, тут мы имеем только исключение.

Что странно, само исключение в логах не светится. Хотя это, возможно, проблема именно нашего приложения, у нас тут много наворочено.

Но если в вашем коде нет такой паузы, то прерывание потока становится чем-то необязательным. Т.е. под капотом Future.cancel() просто переключает внутренний флаг Thread о том, что был запрос на прерывание. И работа продолжается. Мило.. И поэтому в вашем ассинхронном методе где-нибудь нужна явная обработка с проверкой Thread.currentThread().isInterrupted().

Вообще, это логично (если бы не исключение из sleep): родительский процесс запросил прервать работу, и вы можете выполнить graceful shutdown метода там, где это удобно. Прибраться, например, записать что-нибудь в базу и лог. И выйти. Тут у меня нет претензий.

А теперь приколы. Под капотом Thread в 11й Java вокруг isInterrupted() творится мутная херня. Если вызвать статический метод Thread.isInterrupted() он вернет флаг, было ли прерывание и при этом сбросит его! Т.е. это метод-действие, а не метод-чтение (нарушение CQRS). Если же вызвать его перегруженную динамическую версию через Thread.currentThread().isInterrupted(), тогда флаг не сбрасывается. В 17й версии немного изменились названия методов, но суть осталасть прежней.

Это грабли в высокой траве: если где-то в вашем коде несколько раз будет проверка Thread.isInterrupted(), то только первый раз покажет реальное положение дел. При этом несколько проверок isInterrupted вполне себе возможны, если в фоне выполнятся распределенная логика и в каждом классе своя обработка на прерывание потока выполнения.

Теперь давайте разберемся, как по уму все это должно бы быть в Java:

  • Метод sleep() должен быть оберткой над вызовом native-метода. В нем нужно перехватывать исключение InterruptedException и переключать флаг в Thread о том, что было требование прерваться. И не ронять код исключением.
  • Метод isInterrupted() ни в каких реализациях не должен сбрасывать флаг. Это метод-чтение, не надо вводить разработчика в заблуждение.

И я не знаю, почему решения именно такие, какие есть. Один мой коллега обожает такое обосновывать неоспоримым "значит так надо было, не дураки же писали.". Я не знаю, кто это писал и под какими веществами. Я видел много примеров в разных языках и популярных движках, где в исходниках бред написан. Дураки ли писали? Может и нет. Я склоняюсь к мысли, что изначально подобные решения были проще, а их авторы неопытны. Потом решение развивалось (авторы тоже, хотя не факт), но переписать по-нормальному уже нельзя, потому что на старом "фундаменте" работают чужие приложения. Очень много приложений. И без поддержки обратной совместимости можно обрушить мир.

В текущих реалиях нужно переписать вышеприведенный код и постараться не забыть, почему именно так сделано:

@Async
fun asyncUpdateResponderStats(request: UpdateStatsRequest): Future<String> {
    logger.debug("Async test starting.")
    try {
        for (i in 1..20) {
            Thread.sleep(1000)
            logger.debug("Async test. Tick for id=${request.responderExtId}.")
            if (Thread.currentThread().isInterrupted()) {
                throw InterruptedException("Async test interrupted.")
            }
        }
        logger.debug("Async test finished.")
        sendNotification(responder)
    } catch (e: InterruptedException) {
        logger.info(e.message)
    }
    return AsyncResult("whatever")
}

Мне такое решение видится меньшим из зол.

Я допускаю, что это у меня бред, и я просто не разобрался, как с этим работать. И может быть, когда я стану опытнее, я решу удалить эту статью, как лепет java-джуна. Но пока я стою на своем.


Исключения из потока - вынос мозга

Выше я отмечал, что исключение из потока не засветилось. А это потому что они не всплывают из потока, если ассинхронный метод возвращает реализацию Future! Т.е. если ничего не возвращать, тогда исключения всплывают. Иначе - начинается неведомая херня.

Самый простой способ вытянуть исключения - вызвать в клиентском коде (в нашем примере это контроллер) метод Future.get(). Он вернет результат ассинхронного метода ("whatever") либо исключение, если оно было. Ну допустим.. вот только в случае с прерыванием в Thread.sleep() оттуда долетит уже не InterruptedException, а java.util.concurrent.CancellationException! Я не понимаю, что происходит :(

Простое решение нам не подходит. Ассинхронный вызов превращается в синхронный, т.к. придется ждать, когда во Future.get() будет результат.

Есть решения сложнее. Написать кучу кода в конфигурировании ассинхронной поддержки, и в частности - перехватчик исключений в потоках. Более-менее коротко описано тут. Но он мне никуда не уперся, мне нужно чтобы исключение просто всплывало в одном потоке и обратывалось его перехватчиком. Какого ж все так сложно?

Ситуация еще хуже. Свой перехватчик исключения нужен, если асинхронный метод ничего не возвращает. Это не наш случай. И что еще интересно, если свой перехватчик не описать (и ничего не возвращать), то исключения прекрасно вплывают сами.

Я не смог решить эту ситуацию на уровне Java/Spring. Но так как мы пишем на Kotlin, в нем есть сопрограммы (corountines). Вот через них сделал все, что нужно. Но это совсем другая история.

[1oo%, EoF]

Понравилась статья? Расскажите о ней друзьям:

Метки: java, кодинг

Комментарии
Для работы модуля комментариев включите javaScript


Показать/скрыть правила
Имя
[i] [b] [u] [s] [url]
:-) ;-) :D *lol* 8-) :-* :-| :-( *cry* :o :-? *unsure* *oops* :-x *shocked* *zzz* :P *evil*

Осталось 1000 символов.
Код защиты от спама Обновить код
Каждый комментарий проходит ручную модерацию. 100% фильтрация спама.
Продвижение
Время
Метки