Рубрики
Без рубрики

Идите к разработчикам Java (или параллелизм Java настолько плох?!)

сравнение параллелизма между Java и Go. С тегами java, go, параллелизм, веселье.

Я ни в коем случае не являюсь экспертом в Го, скорее наоборот. В настоящее время я пытаюсь ознакомиться с ним. Я начал знакомиться с синтаксисом, памятью и моделью параллелизма. Как обычно для меня, я пытаюсь противопоставить это чему-то, что я уже знаю, например Java. Итак, я наткнулся на это интересное выступление , в котором великий Сайма представил модель параллелизма Go с некоторыми примерами. Слайды для выступления и примеры находятся здесь . Вскоре после выступления возник вопрос: подумайте о том, что потребовалось бы для реализации того же самого на других языках, таких как Java. Неужели это действительно так сложно? Я не был так уверен, я имею в виду, что в Java нет оператора ‘select’, и у него нет встроенных каналов, но не должно быть сложно воспроизвести примеры на Java или нет? Поэтому я подумал, что мог бы немного повеселиться, реализуя примеры на Java.

Перейти к параллелизму

Прежде чем перейти к примеру, это краткое изложение выступления (кстати, это классное выступление, так что я действительно предлагаю вам посмотреть его).

  1. Перейти к параллелизму
    • Модель параллелизма основана на последовательных процессах связи (Hoare, 1978)
    • Параллельные программы структурированы как независимые процессы, которые выполняются последовательно и взаимодействуют путем передачи сообщений.
    • ” Не общайтесь, делясь памятью, делитесь памятью, общаясь”
    • Примитивы Go: маршруты, каналы и оператор select
  2. Перейти к рутинным действиям
    • Это легкий поток (это не поток)
    • Канал обеспечивает связь между маршрутизаторами (аналогично синхронизированной очереди в Java)
    • Выберите мультиплексную связь между маршрутами

Пример

В приведенных примерах мы должны создать гипотетический клиент, который запрашивает сервисы Google (веб-сервисы, сервисы изображений и видео). В идеале мы хотели бы запрашивать эти службы параллельно и агрегировать ответы. Весь код для примеров находится в Весь код для примеров находится в .

Первый пример: параллельный запрос в поисковую систему Google

Вот как выглядит код go:

        func Google(query string) (results []Result) {
            c := make(chan Result)
            go func() { c <- Web(query) }()
            go func() { c <- Image(query) }()
            go func() { c <- Video(query) }()

            for i := 0; i < 3; i++ {
                result := <-c
                results = append(results, result)
            }
            return
        }

А как насчет Java? Мое решение включает в себя использование CompletableFuture, как показано ниже.

    public void google(String query) throws ExecutionException, InterruptedException {
        CompletableFuture[] futures = new CompletableFuture[] {
            supplyAsync(() -> web(query)),
            supplyAsync(() -> image(query)),
            supplyAsync(() -> video(query))
        };

        List result = new ArrayList<>();

        allOf(futures)
            .thenAccept((ignore) -> Arrays.stream(futures)
                                          .map(this::safeGet)
                                          .forEach(result::add)).get();
        // Note: calling get is necessary only because I want to print the result 
        // before returning from the function
        System.out.println(result);
    }

    protected String safeGet(Future future) {
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "";
    }

Веб-сервисы, сервисы изображений и видео – это просто насмешки со случайными спящими режимами. Итак, в чем разница между кодом java и go one? Код Java немного более подробный, и код не использует передачу сообщений между потоками, как в Go, кроме того, они выглядят действительно похожими.

Давайте перейдем ко второму примеру.

Второй пример: тайм-аут

Что, если мы не хотим ждать медленных серверов? Мы можем использовать тайм-аут! Идея состоит в том, чтобы подождать, пока либо все серверы не ответят на наш запрос, либо истечет тайм-аут.

func Google(query string) (results []Result) {
    c := make(chan Result, 3)
    go func() { c <- Web(query) }()
    go func() { c <- Image(query) }()
    go func() { c <- Video(query) }()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

Давайте посмотрим, как это будет выглядеть в java:

    public void googleWithTimeout(String query) throws ExecutionException, InterruptedException {
        // This is the first difference with the go example, the result array must             
        // be a synchronized list.
        // Go channel are completely thread safe, so it's totally okay to funnel 
        // data from multiple go routines to an array.
        List result = Collections.synchronizedList(new ArrayList<>());

        // this is not safe since it's possible that all the threads in the thread     
        // pool (default to ForkJoin) are busy, so the timer won't start
        CompletableFuture timeout = runAsync(() -> timeout(TIMEOUT_MILLIS));

        anyOf(
            allOf(runAsync(() -> result.add(web(query))),
                  runAsync(() -> result.add(image(query))),
                  runAsync(() -> result.add(video(query)))),
            timeout
        ).get();


        System.out.println(result);
    }

    protected Void timeout(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }

В примере Java есть существенное отличие от примера go: задачи совместно используют результирующий массив, поэтому для работы кода Java нам нужен синхронизированный массив. С другой стороны, Go channel полностью потокобезопасен, поэтому совершенно нормально передавать данные из нескольких горутинов в массив. Как упоминалось в комментарии, использование тайм-аута не является полностью безопасным, действительно, возможно, что все потоки в пуле потоков (по умолчанию forkJoin) заняты так что таймер не запустится. Очевидно, что мы можем запустить поток с другим ExecutorService или просто вручную создать поток и запустить его.

    protected CompletableFuture timeout(int millis) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

        final CompletableFuture timeout = new CompletableFuture<>();
        executorService.schedule(() -> {
            timeout.complete(null);
        }, millis, TimeUnit.MILLISECONDS);

        return timeout;
    }

Третий пример: Уменьшите задержку хвоста с помощью реплицированных поисковых серверов.

В go:

func Google(query string) (results []Result) {
    c := make(chan Result, 3)
    go func() { c <- First(query, Web1, Web2) }()
    go func() { c <- First(query, Image1, Image2) }()
    go func() { c <- First(query, Video1, Video2) }()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

где функция First определяется следующим образом:

func First(query string, replicas ...Search) Result {
    c := make(chan Result, len(replicas))
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

Давайте посмотрим на Java

    public void googleWithReplicatedServers(String query) throws ExecutionException, InterruptedException {
        List result = Collections.synchronizedList(new ArrayList<>());

        // Unfortunately this does not work as expected because the inner anyOf 
        // won't stop the other calls, so the result might end up having 
        // duplicates, i.e [some-image, some-image, some-video]
        anyOf(
            allOf(
                anyOf(runAsync(() -> result.add(web(query))), runAsync(() -> result.add(webReplica(query)))),
                anyOf(runAsync(() -> result.add(image(query))), runAsync(() -> result.add(imageReplica(query)))),
                anyOf(runAsync(() -> result.add(video(query))), runAsync(() -> result.add(videoReplica(query))))
            ),
            timeout(TIMEOUT_MILLIS)
        ).get();

        System.out.println(result);
    }

К сожалению, код не совсем работает, потому что, когда одно из будущих завершений добавит результат в массив, но выполнение другого будет продолжаться, вызывая дублирование результата. Давайте исправим это:

    // replicate servers and use the first response - fixing the problem mentioned 
    // earlier by using supplyAsync + thenAccept instead of runAsync
    public void googleWithReplicatedServers2(String query) throws ExecutionException, InterruptedException {
        List result = Collections.synchronizedList(new ArrayList<>());

        anyOf(
            allOf(
                anyOf(supplyAsync(() -> web(query)),
                      supplyAsync(() -> webReplica(query))).thenAccept((s) -> result.add((String) s)),
                anyOf(supplyAsync(() -> image(query)),
                      supplyAsync(() -> imageReplica(query))).thenAccept((s) -> result.add((String) s)),
                anyOf(supplyAsync(() -> video(query)),
                      supplyAsync(() -> videoReplica(query))).thenAccept((s) -> result.add((String) s))
            ),
            timeout(TIMEOUT_MILLIS)
        ).get();

        System.out.println(result);
    }

    // same as above, but this time we use the function 'first', which is really 
    // just a wrapper around CompletableFuture.anyOf
    public void googleWithReplicatedServers3(String query) throws ExecutionException, InterruptedException {
        List result = Collections.synchronizedList(new ArrayList<>());

        anyOf(
            allOf(
                first(query, Google::web, Google::webReplica).thenAccept((s) ->  result.add((String) s)),
                first(query, Google::image, Google::imageReplica).thenAccept((s) ->  result.add((String) s)),
                first(query, Google::video, Google::videoReplica).thenAccept((s) ->  result.add((String) s))
            ),
            timeout(TIMEOUT_MILLIS)
        ).get();

        System.out.println(result);
    }

Выводы

Помимо того факта, что я получил некоторое удовольствие от CompletableFuture, явным преимуществом Go на самом деле является тот факт, что модель параллелизма встроена в сам язык, что упрощает взаимодействие между различными агентами. С другой стороны, я не уверен, почему они отказались от поддержки ООП, например, классов. Я имею в виду, что плохого в ООП?

Оригинал: “https://dev.to/napicella/go-for-java-developers-or-is-the-java-concurrency-that-bad-6ff”