Рубрики
Без рубрики

Управление членством в Кластере с помощью Etc

В этом посте мы хотим взглянуть на то, как мы можем использовать etc для управления членством в кластере в распределенном приложении. Помеченный java, распределенными системами, кластером и т.д.

В эпоху глобальных интернет-сервисов распределенные системы стали повсеместными. Однако для использования возможностей распределенных вычислений и хранения данных требуется координация действий участвующих сторон. Распределенные алгоритмы объединяют несколько физических компонентов в один логический компонент. Когда пользователь отправляет запрос в кластер балансировки нагрузки или распределенную базу данных, тот факт, что задействовано несколько процессов, должен быть прозрачным.

Кластер – это совокупность узлов, соединенных через сеть. Большинство распределенных алгоритмов требуют согласованного (или, по крайней мере, в конечном итоге согласованного) представления всех узлов, входящих в кластер. Например, в механизме распределенной обработки данных мы используем представление кластера, чтобы определить, как разделять и распределять данные. Как мы можем поддерживать согласованное представление о кластере внутри каждого участника?

Наша цель – поддерживать список участников в памяти на каждом узле. Когда узел присоединяется к кластеру или покидает его, нам необходимо обновить списки участников во всех узлах. В идеале мы также хотим обнаружить неработающие узлы, так как они могут быть не в состоянии отправить запрос на отпуск в случае аппаратного сбоя, ошибки нехватки памяти или аналогичной проблемы.

Как правило, существует два типа парадигм распределенной коммуникации, которые можно использовать для обмена обновлениями членства в кластере: децентрализованный и централизованный подходы. Децентрализованные подходы включают протоколы в стиле эпидемий или сплетен, которые распространяют информацию среди коллег без центрального координатора/единого источника правды. Централизованные подходы опираются на своего рода координатора, который действует как единый источник истины и распространяет обновленную информацию всем заинтересованным сторонам.

Протоколы в стиле сплетен стали популярными из-за их масштабируемости и отсутствия единой точки отказа. Поскольку все члены равны, их можно легко заменить. Однако в условиях одновременных изменений разрешение конфликтов и достижение консенсуса может оказаться сложной задачей. Вот почему многие приложения полагаются на внешнее приложение для последовательного управления и отслеживания информации о членстве. Популярными примерами таких служб координации являются Apache Zookeeper , Консул или etcd .

В этом посте мы хотим взглянуть на то, как мы можем использовать etc для управления членством в кластере в распределенном приложении. Мы объединим различные API etc, такие как хранилище ключевых значений, наблюдатели и арендаторы, для создания и ведения списка участников в памяти на наших узлах. Приложение написано на Java, и исходный код доступен на GitHub. Остальная часть поста структурирована следующим образом.

Во-первых, мы дадим обзор целевой архитектуры, представив различные функциональные возможности etc, необходимые на концептуальном уровне. После этого мы будем реализовывать дизайн шаг за шагом. Мы завершаем публикацию, резюмируя основные выводы и обсуждая потенциальные улучшения. Исходный код доступен на GitHub.

Целевая архитектура состоит из набора узлов приложения, образующих кластер, и т.д. Каждый узел сохраняет свои метаданные в хранилище etcd ключ-значение (KV) при присоединении к кластеру. Мы можем идентифицировать узел по случайно сгенерированному UUID.

Каждый узел подписывается на обновления членства через API etcd watch, чтобы обновить свое локальное состояние. Обнаружение сбоев осуществляется путем подключения метаданных узла к аренде. Если узлу не удастся сохранить аренду в силе из-за сбоя, он будет автоматически удален из кластера.

Для получения дополнительной информации об API etc вы можете проверить “”Взаимодействие с etc” . На следующей диаграмме показана настройка кластера из четырех узлов.

В следующем разделе мы будем реализовывать эту функциональность шаг за шагом.

Основы

В качестве первого шага мы реализуем класс, инкапсулирующий все функциональные возможности одного узла. Каждому узлу требуется подключение к etc и список участников. Давайте сначала посмотрим на весь файл, а затем пройдемся по нему шаг за шагом.

package de.frosner.server;

import ...

public class Node implements AutoCloseable {

  private final NodeData nodeData;

  private final Client etcdClient;

  private final ConcurrentHashMap clusterMembers = 
    new ConcurrentHashMap<>();

  public Node(List endpoints) {
    nodeData = new NodeData(UUID.randomUUID());
    etcdClient = Client.builder().endpoints(endpoints).build();
  }

  public void join() throws JoinFailedException {
    // TODO
  }

  public void leave() throws LeaveFailedException {
    // TODO
  }

  public Set getClusterMembers() {
    return ImmutableSet.copyOf(clusterMembers.values());
  }

  public NodeData getNodeData() {
    return nodeData;
  }

  @Override
  public void close() {
    leave();
    etcdClient.close();
  }
}

Мы хотим связать метаданные с каждым узлом. Класс Данные узла хранит эту информацию. Метаданные могут быть специфичными для системы, например, время, когда узел присоединился к кластеру, или специфичными для приложения, например, разделы, за которые отвечает узел в случае распределенной базы данных. Для простоты у нас будет только UUID внутри Данных узла .

Для связи с etc мы будем использовать jet cd . У каждого узла есть клиент etcd, который подключается к нашему центральному кластеру etcd. Список участников будет представлен в виде ConcurrentHashMap данные узла> , чтобы в дальнейшем мы могли безопасно взаимодействовать с ним из разных потоков. данные узла>

Мы также создали заглушки для join() и оставить() методы, и реализованные Автоклавируемый таким образом, мы можем использовать узел внутри инструкции try-with-resources. Исключение Присоединиться Не удалось и Оставить Неудачное исключение являются пользовательскими исключениями, которые мы создали, чтобы указать, что что-то пошло не так во время присоединения или выхода из кластера.

Затем мы создадим скелет тестирования, чтобы мы могли проверить нашу реализацию с помощью автоматических тестов. Благодаря удивительному Тестовые контейнеры библиотека очень легко создать сервер etcd в рамках жизненного цикла тестирования. Вот и начинается тестовый класс:

package de.frosner.server;

import ...

@Testcontainers
class NodeTest {

  private static final Network network = Network.newNetwork();
  private static final int ETCD_PORT = 2379;

  private ToxiproxyContainer.ContainerProxy etcdProxy;

  @AfterAll
  private static void afterAll() {
    network.close();
  }

  @Container
  private static final GenericContainer etcd =
    new GenericContainer<>(EtcdContainer.ETCD_DOCKER_IMAGE_NAME)
      .withCommand("etcd",
        "-listen-client-urls", "http://0.0.0.0:" + ETCD_PORT,
        "--advertise-client-urls", "http://0.0.0.0:" + ETCD_PORT,
        "--name", NodeTest.class.getSimpleName())
      .withExposedPorts(ETCD_PORT)
      .withNetwork(network);

  @Container
  public static final ToxiproxyContainer toxiproxy = 
    new ToxiproxyContainer("shopify/toxiproxy:2.1.0")
      .withNetwork(network)
      .withNetworkAliases("toxiproxy");

  @BeforeEach
  public void beforeEach() {
      etcdProxy = toxiproxy.getProxy(etcd, ETCD_PORT);
  }

  private List getClientEndpoints() {
    return List.of(URI.create(
      "https://" + etcd.getContainerIpAddress() +
        ":" + etcd.getMappedPort(ETCD_PORT)
    ));
  }

  private List getProxiedClientEndpoints() {
    return List.of(URI.create(
      "https://" + etcdProxy.getContainerIpAddress() + 
        ":" + etcdProxy.getProxyPort()
    ));
  }

  @Test
  public void testNodeJoin() throws Exception {
    try (Node node = new Node(getClientEndpoints())) {
      node.join();
    }
  }
}

Скелет содержит один тест, который заставляет узел присоединиться к кластеру, а затем закрывает его, заставляя его снова уйти. Поскольку мы еще не реализовали никаких функциональных возможностей, мы не ожидаем, что что-то произойдет.

Обратите внимание, что мы создаем пользовательскую сеть докеров и контейнер Toxiproxy . Для начальных тестов это не требуется, но нам это понадобится позже, когда мы захотим смоделировать сетевые сбои. Для простоты мы будем использовать только один узел etcd. В производственном сценарии у вас должен быть кластер etcd, состоящий как минимум из трех узлов.

Давайте теперь реализуем базовый алгоритм соединения.

Присоединение к кластеру

При присоединении к кластеру узел помещает свои метаданные в etc. Мы храним все метаданные узла в NODES_PREFIX , что позволяет нам позже отслеживать изменения членства на основе этого префикса.

public void join() throws JoinFailedException {
  try {
    putMetadata();
  } catch (Exception e) {
    throw new JoinFailedException(nodeData, e);
  }
}

private void putMetadata() throws Exception {
  etcdClient.getKVClient().put(
    ByteSequence.from(
      NODES_PREFIX + nodeData.getUuid(),
      StandardCharsets.UTF_8
    ),
    ByteSequence.from(
      JsonObjectMapper.INSTANCE.writeValueAsString(nodeData),
      StandardCharsets.UTF_8
    )
  ).get(OPERATION_TIMEOUT, TimeUnit.SECONDS);
}

Учитывая эту реализацию, мы можем изменить существующий тестовый случай, чтобы запросить у etc метаданные узла.

@Test
public void testNodeJoin() throws Exception {
  try (Node node = new Node(getClientEndpoints())) {
    node.join();
    assertThat(getRemoteState(node.getNodeData()))
      .isEqualTo(node.getNodeData());
  }
}

private NodeData getRemoteState(NodeData node) throws Exception {
  String nodeDataJson = etcdClient.getKVClient()
    .get(ByteSequence.from(Node.NODES_PREFIX + node.getUuid(),
      StandardCharsets.UTF_8))
    .get(Node.OPERATION_TIMEOUT, TimeUnit.SECONDS)
    .getKvs()
    .get(0)
    .getValue()
    .toString(StandardCharsets.UTF_8);
  return JsonObjectMapper.INSTANCE
    .readValue(nodeDataJson, NodeData.class);
}

Теперь узел может присоединиться к кластеру, но он не заметит, когда присоединятся и другие узлы. Итак, давайте реализуем эту функциональность дальше.

Обновление Членства в Кластере

При создании нового объекта узла мы хотим поддерживать список участников в актуальном состоянии. Для этого мы сначала загружаем существующий снимок метаданных кластера, а затем отслеживаем изменения, начиная с последней замеченной версии. Обновленный конструктор выглядит следующим образом:

public Node(List endpoints, long leaseTtl) throws Exception {
  nodeData = new NodeData(UUID.randomUUID());
  etcdClient = Client.builder().endpoints(endpoints).build();
  long maxModRevision = loadMembershipSnapshot();
  watchMembershipChanges(maxModRevision + 1);
}

Загрузка моментального снимка выполняется с помощью API значения ключа путем предоставления префикса в качестве дополнительного getOption . Затем мы заполняем члены кластера на основе возвращенных значений и вычисляем максимальную ревизию данных.

private long loadMembershipSnapshot() throws Exception {
  GetResponse response = etcdClient.getKVClient().get(
    ByteSequence.from(NODES_PREFIX, StandardCharsets.UTF_8),
    GetOption.newBuilder()
      .withPrefix(ByteSequence.from(NODES_PREFIX, StandardCharsets.UTF_8))
      .build()
  ).get(OPERATION_TIMEOUT, TimeUnit.SECONDS);

  for (KeyValue kv : response.getKvs()) {
    NodeData nodeData = JsonObjectMapper.INSTANCE.readValue(
      kv.getValue().toString(StandardCharsets.UTF_8),
      NodeData.class
    );
    clusterMembers.put(nodeData.getUuid(), nodeData);
  }

  return response.getKvs().stream()
    .mapToLong(KeyValue::getModRevision).max().orElse(0);
}

Используя API-интерфейс watch, мы можем создать часы для одного и того же префикса, начиная со следующей редакции, поэтому мы не теряем никаких изменений членства, которые могут произойти между снимком и запросом наблюдения. Мы обрабатываем входящие события просмотра в отдельной функции обрабатываем событие просмотра .

private void watchMembershipChanges(long fromRevision) {
  logger.info("Watching membership changes from revision {}", fromRevision);
  watcher = etcdClient.getWatchClient().watch(
    ByteSequence.from(NODES_PREFIX, StandardCharsets.UTF_8),
    WatchOption.newBuilder()
      .withPrefix(ByteSequence.from(NODES_PREFIX, StandardCharsets.UTF_8))
      .withRevision(fromRevision)
      .build(),
    watchResponse -> {
      watchResponse.getEvents().forEach(this::handleWatchEvent);
    },
    error -> logger.error("Watcher broke", error),
    () -> logger.info("Watcher completed")
  );
}

Ответ на наблюдение может содержать ПОЛОЖИТЬ или УДАЛИТЬ события в зависимости от того, присоединяются ли узлы к кластеру или покидают его. События PUT содержат обновленные метаданные узла, которые мы можем добавить в члены кластера . |/УДАЛИТЬ события содержат удаленный ключ, из которого мы можем извлечь UUID узла для соответствующего обновления Членов кластера

private void handleWatchEvent(WatchEvent watchEvent) {
  try {
    switch (watchEvent.getEventType()) {
      case PUT:
        NodeData nodeData = JsonObjectMapper.INSTANCE.readValue(
          watchEvent.getKeyValue().getValue().toString(StandardCharsets.UTF_8),
          NodeData.class
        );
        clusterMembers.put(nodeData.getUuid(), nodeData);
        break;
      case DELETE:
        String etcdKey = watchEvent.getKeyValue().getKey()
          .toString(StandardCharsets.UTF_8);
        UUID nodeUuid = UUID.fromString(extractNodeUuid(etcdKey));
        clusterMembers.remove(nodeUuid);
        break;
      default:
        logger.warn("Unrecognized event: {}", watchEvent.getEventType());
    }
  } catch (Exception e) {
    throw new RuntimeException("Failed to handle watch event", e);
  }
}

private String extractNodeUuid(String etcdKey) {
  return etcdKey.replaceAll(Pattern.quote(NODES_PREFIX), "");
}

Учитывая нашу новую функциональность для обновления списка участников, мы можем создать новый тестовый случай, в котором два узла присоединяются к кластеру, и ожидать, что в конечном итоге это отразится на локальном состоянии каждого узла. Благодаря Ожидание цифровая абонентская линия мы можем спокойно подождать, пока произойдет окончательное обновление.

@Test
public void testTwoNodesJoin() throws Exception {
  try (Node node1 = new Node(getClientEndpoints())) {
    node1.join();
    try (Node node2 = new Node(getClientEndpoints())) {
      node2.join();
      Awaitility.await("Node 1 to see all nodes")
        .until(() -> node1.getClusterMembers()
        .containsAll(List.of(node1.getNodeData(), node2.getNodeData())));
      Awaitility.await("Node 2 to see all nodes")
        .until(() -> node2.getClusterMembers()
        .containsAll(List.of(node1.getNodeData(), node2.getNodeData())));
    }
  }
}

Далее давайте посмотрим, как мы можем обнаружить отказавшие узлы и автоматически удалить их из кластера.

Обнаружение сбоев

Обнаружение сбоев будет выполняться простым централизованным детектором сбоев сердцебиения. Etc предоставляет API аренды для этой цели. Срок аренды истекает через заданное количество времени, если только они не будут сохранены в силе. Мы сохраним идентификатор аренды и клиента keep alive в новых полях, чтобы очистить аренду при последующем выезде.

private volatile long leaseId;

private volatile CloseableClient keepAliveClient;

Теперь мы изменяем метод join , чтобы сначала запросить разрешение на аренду, прежде чем размещать метаданные.

public void join() throws JoinFailedException {
  try {
    grantLease();
    putMetadata();
  } catch (Exception e) {
    throw new JoinFailedException(nodeData, e);
  }
}

Предоставление аренды осуществляется с помощью API аренды. Когда договор аренды будет заключен, мы должны сохранить его в живых. Мы можем предоставить StreamObserver , который реагирует на успешные, неудачные или завершенные операции поддержания активности, как показано в следующем коде.

private void grantLease() throws Exception {
  Lease leaseClient = etcdClient.getLeaseClient();
  leaseClient.grant(5) // 5 sec TTL
    .thenAccept((leaseGrantResponse -> {
      leaseId = leaseGrantResponse.getID();
      logger.info("Lease {} granted", leaseId);
      keepAliveClient = leaseClient.keepAlive(leaseId,
        new StreamObserver<>() {
          @Override
          public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
            // you can increment some metric counter here
          }
          @Override
          public void onError(Throwable throwable) {
            // log and handle error
          }
          @Override
          public void onCompleted() {
            // we're done, nothing to do
          }
        });
    })).get(OPERATION_TIMEOUT, TimeUnit.SECONDS);
}

Метаданные узла прикрепляются к недавно приобретенной аренде, поэтому они автоматически удаляются по истечении срока аренды или удаляются.

private void putMetadata() throws Exception {
  etcdClient.getKVClient().put(
    ByteSequence.from(
      NODES_PREFIX + nodeData.getUuid(),
      StandardCharsets.UTF_8
    ),
    ByteSequence.from(
      JsonObjectMapper.INSTANCE.writeValueAsString(nodeData),
      StandardCharsets.UTF_8
    ),
    PutOption.newBuilder().withLeaseId(leaseId).build()
  ).get(OPERATION_TIMEOUT, TimeUnit.SECONDS);
}

Чтобы проверить функциональность аренды, мы используем модуль Тестовые контейнеры Toxiproxy , чтобы ввести сетевую задержку, превышающую TTL аренды, что приведет к удалению отказавшего узла.

@Test
public void testTwoNodesLeaseExpires() throws Exception {
  try (Node node1 = new Node(getClientEndpoints())) {
    node1.join();
    try (Node node2 = new Node(getProxiedClientEndpoints())) {
      node2.join();

      Awaitility.await("Node 1 to see all nodes")
        .until(() -> node1.getClusterMembers()
          .containsAll(List.of(node1.getNodeData(), node2.getNodeData())));

      etcdProxy.toxics()
        .latency("latency", ToxicDirection.UPSTREAM, 6000);

      Awaitility.await("Node 1 to see that node 2 is gone")
        .until(() -> node1.getClusterMembers()
          .equals(Set.of(node1.getNodeData())));
    }
  }
}

Обратите внимание, что дополнительные действия могут быть добавлены в качестве реакции на аренду, которую не удалось сохранить. Например, узлы могут попытаться воссоединиться с кластером. Очевидно, что конкретные действия зависят от приложения. И последнее, но не менее важное: давайте выполним операцию изящного отпуска.

Покидая кластер

Покинуть кластер так же просто, как отозвать договор аренды. Etc автоматически удалит все ключи, связанные с арендой, по существу удалив метаданные узла.

public void leave() throws LeaveFailedException {
  try {
    logger.info("Leaving the cluster");
    if (keepAliveClient != null) {
      keepAliveClient.close();
    }
    etcdClient.getLeaseClient().revoke(leaseId)
      .get(OPERATION_TIMEOUT, TimeUnit.SECONDS);
  } catch (Exception e) {
    throw new LeaveFailedException(nodeData, e);
  }
}

Мы расширяем набор тестов, добавляя тестовый случай, в котором узел присоединяется и покидает, а остальные узлы должны наблюдать за изменениями членства.

@Test
public void testTwoNodesJoinLeave() throws Exception {
  try (Node node1 = new Node(getClientEndpoints())) {
    node1.join();
    try (Node node2 = new Node(getClientEndpoints())) {
      node2.join();
      Awaitility.await("Node 1 to see all nodes")
        .until(() -> node1.getClusterMembers()
          .containsAll(List.of(node1.getNodeData(), node2.getNodeData())));
      Awaitility.await("Node 2 to see all nodes")
        .until(() -> node2.getClusterMembers()
          .containsAll(List.of(node1.getNodeData(), node2.getNodeData())));
    }
    Awaitility.await("Node 1 to see that node 2 is gone")
      .until(() -> node1.getClusterMembers()
        .equals(Set.of(node1.getNodeData())));
  }
}

Вот и все! У нас есть рабочая реализация узла, который может присоединяться к кластеру и покидать его, а также управлять членством через etc!

В этом посте мы реализовали очень простое распределенное приложение. Etc управляет членством в кластере и распространяет его через API-интерфейс “ключ-значение” и API-интерфейс просмотра, но также действует как детектор сбоев благодаря API аренды. Внедрение автоматизированных тестов было простым благодаря тестовым контейнерам. Модуль Toxiproxy обеспечивает удобный способ имитации неисправностей во время выполнения теста.

Обратите внимание, что написанный нами код Java – это только основа. В зависимости от задач, которые должно выполнять ваше распределенное приложение, вам придется, например, добавить функциональность в алгоритм присоединения и выхода. Etc также предоставляет API блокировки, который вы можете использовать для добавления дополнительной координации.

Оригинал: “https://dev.to/frosnerd/managing-cluster-membership-with-etcd-l0k”