Skyframe StateMachines のガイド

問題を報告 ソースを表示 Nightly · 8.3 · 8.2 · 8.1 · 8.0 · 7.6

概要

Skyframe の StateMachine は、ヒープに存在する分解された関数オブジェクトです。必要な値がすぐに使用可能ではなく、非同期で計算される場合、冗長性なしで柔軟な評価をサポートします。1StateMachine は待機中にスレッド リソースを占有することはできません。代わりに、一時停止して再開する必要があります。このように分解することで、明示的な再エントリ ポイントが公開され、以前の計算をスキップできるようになります。

StateMachine は、シーケンス、分岐、構造化された論理的並行性を表現するために使用でき、Skyframe インタラクション専用に調整されています。StateMachine は、より大きな StateMachine に構成して、サブ StateMachine を共有できます。同時実行は常に階層構造であり、純粋に論理的です。すべての同時実行サブタスクは、単一の共有親 SkyFunction スレッドで実行されます。

はじめに

このセクションでは、java.com.google.devtools.build.skyframe.state パッケージにある StateMachine の動機と概要を簡単に説明します。

Skyframe の再起動の概要

Skyframe は、依存関係グラフの並列評価を実行するフレームワークです。グラフの各ノードは、パラメータを指定する SkyKey と結果を指定する SkyValue を使用した SkyFunction の評価に対応しています。計算モデルでは、SkyFunction が SkyKey で SkyValue をルックアップし、追加の SkyFunction の再帰的並列評価をトリガーします。スレッドを占有するブロッキングの代わりに、計算のサブグラフが不完全なため、リクエストされた SkyValue がまだ準備できていない場合、リクエスト元の SkyFunction は null getValue レスポンスを監視し、SkyValue ではなく null を返す必要があります。これは、入力が不足しているため不完全であることを示します。Skyframe は、以前にリクエストされたすべての SkyValue が利用可能になると、SkyFunctions を再起動します。

SkyKeyComputeState の導入前は、再起動を処理する従来の方法では、計算を完全に再実行していました。これは 2 次の複雑性がありますが、この方法で記述された関数は最終的に完了します。これは、再実行ごとに null を返すルックアップの数が減るためです。SkyKeyComputeState を使用すると、手動で指定したチェックポイント データを SkyFunction に関連付けることができ、再計算を大幅に削減できます。

StateMachineSkyKeyComputeState 内に存在するオブジェクトで、実行の一時停止と再開のフックを公開することで、SkyFunction が再起動したときに(SkyKeyComputeState がキャッシュから外れないと仮定して)再計算をほぼすべて排除します。

SkyKeyComputeState 内のステートフル コンピューティング

オブジェクト指向設計の観点から、純粋なデータ値ではなく、計算オブジェクトを SkyKeyComputeState 内に保存することを検討するのは理にかなっています。Java では、動作を伴うオブジェクトの最小限の説明は関数型インターフェースであり、これで十分です。StateMachine には、次の再帰的な定義があります。2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks インターフェースは SkyFunction.Environment と似ていますが、非同期用に設計されており、論理的に同時実行されるサブタスクのサポートが追加されています3

step の戻り値は別の StateMachine であり、ステップのシーケンスを帰納的に指定できます。StateMachine が完了すると、stepDONE を返します。次に例を示します。

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

次の出力で StateMachine を記述します。

hello
world

step2StateMachine の関数型インターフェース定義を満たしているため、メソッド参照 this::step2StateMachine であることに注意してください。メソッド参照は、StateMachine で次の状態を指定する最も一般的な方法です。

一時停止と再開

直感的に、計算をモノリシック関数ではなく StateMachine ステップに分割すると、計算を一時停止して再開するために必要なフックが提供されます。StateMachine.step が戻ると、明示的な一時停止ポイントがあります。返された StateMachine 値で指定された継続は、明示的な 再開ポイントです。計算を中断したところから正確に再開できるため、再計算を回避できます。

コールバック、継続、非同期計算

技術的には、StateMachine継続として機能し、実行する後続の計算を決定します。StateMachine は、ブロックするのではなく、step 関数から戻ることで自発的に 一時停止 し、制御を Driver インスタンスに戻します。Driver は、準備完了の StateMachine に切り替えるか、制御を Skyframe に戻すことができます。

従来、コールバック継続は 1 つの概念に統合されていました。ただし、StateMachine はこの 2 つを区別します。

  • コールバック - 非同期計算の結果を保存する場所を記述します。
  • 継続 - 次の実行状態を指定します。

非同期オペレーションを呼び出す場合はコールバックが必要です。つまり、SkyValue のルックアップの場合のように、メソッドを呼び出した直後に実際のオペレーションは発生しません。コールバックはできるだけシンプルに保つ必要があります。

継続StateMachineStateMachine 戻り値であり、すべての非同期計算が解決した後に続く複雑な実行をカプセル化します。この構造化されたアプローチにより、コールバックの複雑さを管理しやすくなります。

タスク

Tasks インターフェースは、SkyKey で SkyValue を検索し、同時実行サブタスクをスケジュールする API を StateMachine に提供します。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue ルックアップ

StateMachineTasks.lookUp オーバーロードを使用して SkyValue を検索します。これらは SkyFunction.Environment.getValue および SkyFunction.Environment.getValueOrThrow に類似しており、同様の例外処理セマンティクスを持ちます。実装では、ルックアップはすぐには実行されず、可能な限り多くのルックアップがバッチ処理されてから実行されます。値がすぐに利用できない場合(Skyframe の再起動が必要な場合など)があるため、呼び出し元は、コールバックを使用して結果の値を処理する方法を指定します。

StateMachine プロセッサ(Driver と SkyFrame へのブリッジ)は、次の状態が始まる前に値が利用可能であることを保証します。以下の例を

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

上記の例では、最初のステップで new Key() のルックアップを行い、コンシューマーとして this を渡します。これは、DoesLookupConsumer<SkyValue> を実装しているためです。

を渡します。

契約により、次の状態 DoesLookup.processValue が開始される前に、DoesLookup.step のすべてのルックアップが完了します。したがって、processValue でアクセスされると value が利用可能になります。

サブタスク

Tasks.enqueue は、論理的に同時実行されるサブタスクの実行をリクエストします。サブタスクも StateMachine であり、通常の StateMachine ができることはすべて実行できます。たとえば、サブタスクを再帰的に作成したり、SkyValues を検索したりできます。lookUp と同様に、ステート マシン ドライバは、次のステップに進む前にすべてのサブタスクが完了していることを確認します。以下の例を

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

Subtask1Subtask2 は論理的には同時実行ですが、すべてが単一のスレッドで実行されるため、i の「同時」更新には同期は必要ありません。

構造化された同時実行

すべての lookUpenqueue は次の状態に進む前に解決する必要があるため、並行処理は自然にツリー構造に制限されます。次の例に示すように、階層型5同時実行を作成できます。

構造化された同時実行

UML から、並行処理構造がツリーを形成していることを判断するのは困難です。ツリー構造をより適切に表示する代替ビューがあります。

構造化されていない同時実行

構造化された同時実行は、推論がはるかに容易です。

構成と制御フロー パターン

このセクションでは、複数の StateMachine を構成する方法の例と、特定の制御フローの問題の解決策を示します。

順次状態

これは、最も一般的でわかりやすい制御フロー パターンです。例については、SkyKeyComputeState 内のステートフルな計算をご覧ください。

分岐

StateMachine の分岐状態は、次の例に示すように、通常の Java 制御フローを使用して異なる値を返すことで実現できます。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  
}

特定のブランチが早期完了のために DONE を返すことはよくあります。

高度なシーケンシャル合成

StateMachine 制御構造はメモリレスであるため、StateMachine 定義をサブタスクとして共有すると、不自然になることがあります。M1M2StateMachine を共有する StateMachine インスタンスとし、M1M2 をそれぞれシーケンス <A, S, B><X, S, Y> とします。問題は、S が完了後に B または Y のどちらを続行すべきかわからないことと、StateMachine がコールスタックを完全に保持しないことです。このセクションでは、これを実現するための手法をいくつか紹介します。

StateMachine を終了シーケンス要素として使用

この方法では、最初に提示された問題は解決しません。共有 StateMachine がシーケンスの終端である場合にのみ、シーケンシャル コンポジションを示します。

// S is the shared state machine.
class S implements StateMachine {  }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

これは、S 自体が複雑な状態マシンである場合でも機能します。

順次コンポジションのサブタスク

キューに登録されたサブタスクは次の状態の前に完了することが保証されているため、サブタスク メカニズムを少しだけ悪用できる場合があります。6

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter インジェクション

S の実行前に完了する必要がある他の並列サブタスクや Tasks.lookUp 呼び出しがあるため、Tasks.enqueue を悪用できない場合があります。この場合、runAfter パラメータを S に挿入して、次の処理を S に通知できます。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
     // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
     // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

このアプローチは、サブタスクを濫用するよりもクリーンです。ただし、複数の StateMachinerunAfter でネストするなど、この方法を過度に適用すると、コールバック地獄に陥ります。代わりに、通常のシーケンシャル状態を使用してシーケンシャル runAfter を分割することをおすすめします。

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

に置き換えることができます。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止の代替: runAfterUnlessError

以前のドラフトでは、エラーが発生するとすぐに中止する runAfterUnlessError を検討していました。これは、エラーが 2 回チェックされることが多いという事実に基づいています。1 回は runAfter 参照を持つ StateMachine によって、もう 1 回は runAfter マシン自体によってチェックされます。

検討の結果、エラーチェックの重複排除よりもコードの均一性が重要であると判断しました。runAfter メカニズムが tasks.enqueue メカニズムと一貫した方法で動作しないと、混乱を招く可能性があります。tasks.enqueue メカニズムでは常にエラー チェックが必要です。

直接委任

正式な状態遷移が発生するたびに、メインの Driver ループが進みます。契約に基づき、状態を進めることは、次の状態が実行される前に、以前にキューに登録されたすべての SkyValue ルックアップとサブタスクが解決されることを意味します。デリゲート StateMachine のロジックにより、位相の進みが不要になったり、逆効果になったりすることがあります。たとえば、委任の最初の step が委任状態のルックアップと並列化できる SkyKey ルックアップを実行する場合、フェーズの進行によりそれらが順次実行されます。次の例に示すように、直接委任を行う方が理にかなっている場合があります。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return ;
  }

  // Rest of implementation.
  

  private StateMachine complete(Tasks tasks) {
    
    return runAfter;
  }
}

データフロー

これまでの説明では、制御フローの管理に重点を置いてきました。このセクションでは、データ値の伝播について説明します。

Tasks.lookUp コールバックの実装

SkyValue ルックアップTasks.lookUp コールバックの実装例があります。このセクションでは、複数の SkyValue を処理するための根拠とアプローチについて説明します。

Tasks.lookUp 個のコールバック

Tasks.lookUp メソッドは、コールバック sink をパラメータとして受け取ります。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

慣用的なアプローチは、Java ラムダを使用してこれを実装することです。

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

ここで、myValue はルックアップを行う StateMachine インスタンスのメンバー変数です。ただし、ラムダでは、StateMachine 実装で Consumer<SkyValue> インターフェースを実装する場合と比較して、追加のメモリ割り当てが必要です。あいまいになる可能性のある複数のルックアップがある場合でも、ラムダは便利です。

Tasks.lookUp には、SkyFunction.Environment.getValueOrThrow と同様のエラー処理オーバーロードもあります。

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

以下に実装例を示します。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      
      return DONE;
    }
    // Processes `value`, which is non-null.
    
  }
}

エラー処理のないルックアップと同様に、StateMachine クラスにコールバックを直接実装すると、ラムダのメモリ割り当てが節約されます。

エラー処理で詳細を確認できますが、基本的には、エラーと通常の値の伝播に大きな違いはありません。

複数の SkyValue を使用する

多くの場合、複数の SkyValue ルックアップが必要です。多くの場合、SkyValue の型を切り替える方法が有効です。次の例は、プロトタイプの本番環境コードを簡略化したものです。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

値の型が異なるため、Consumer<SkyValue> コールバック実装は明確に共有できます。そうでない場合は、ラムダベースの実装や、適切なコールバックを実装する完全な内部クラス インスタンスにフォールバックすることが可能です。

StateMachine 間の値の伝播

これまでのところ、このドキュメントではサブタスクで作業を整理する方法のみを説明してきましたが、サブタスクは呼び出し元に値をレポートする必要もあります。サブタスクは論理的に非同期であるため、その結果はコールバックを使用して呼び出し元に返されます。これを実現するために、サブタスクはコンストラクタを介して挿入されるシンク インターフェースを定義します。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

   // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

呼び出し元 StateMachine は次のようになります。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上記の例では、次のことを示しています。Caller は、結果を伝播して独自の Caller.ResultSink を定義する必要があります。CallerBarProducer.ResultSink コールバックを実装します。再開時に、processResultvalue が null かどうかをチェックして、エラーが発生したかどうかを判断します。これは、サブタスクまたは SkyValue ルックアップのいずれかから出力を受け取った後の一般的な動作パターンです。

acceptBarError の実装は、エラー バブリングで要求されているように、結果を Caller.ResultSink に積極的に転送することに注意してください。

トップレベルの StateMachine の代替については、Driver と SkyFunctions へのブリッジで説明しています。

エラー処理

Tasks.lookUp コールバックStateMachines 間での値の伝播には、すでにエラー処理の例がいくつかあります。InterruptedException 以外の例外はスローされず、値としてコールバックを介して渡されます。このようなコールバックは、値またはエラーのいずれか 1 つだけが渡される排他的論理和セマンティクスを持つことがよくあります。

次のセクションでは、Skyframe のエラー処理との微妙ながらも重要なやり取りについて説明します。

エラー バブリング(--nokeep_going)

エラー バブリング中、リクエストされたすべての SkyValue が利用可能でなくても、SkyFunction が再起動されることがあります。このような場合、Tasks API コントラクトにより、後続の状態に到達することはありません。ただし、StateMachine は例外を伝播する必要があります。

伝播は次の状態に達したかどうかに関係なく発生する必要があるため、エラー処理コールバックはこのタスクを実行する必要があります。内部 StateMachine の場合、これは親コールバックを呼び出すことで実現されます。

SkyFunction とインターフェースするトップレベルの StateMachine では、ValueOrExceptionProducersetException メソッドを呼び出すことで、これを行うことができます。ValueOrExceptionProducer.tryProduceValue は、SkyValue が欠落している場合でも例外をスローします。

Driver が直接使用されている場合は、マシンが処理を完了していなくても、SkyFunction から伝播されたエラーを確認することが重要です。

イベント処理

イベントを生成する必要がある SkyFunction の場合、StoredEventHandler が SkyKeyComputeState に挿入され、さらにそれを必要とする StateMachine に挿入されます。以前は、Skyframe が特定のイベントを再生しない限りドロップするため、StoredEventHandler が必要でしたが、その後修正されました。StoredEventHandler インジェクションは、エラー処理コールバックから発行されるイベントの実装を簡素化するため、保持されます。

Driver と SkyFunctions へのブリッジ

Driver は、指定されたルート StateMachine から始まる StateMachine の実行を管理します。StateMachine はサブタスク StateMachine を再帰的にキューに追加できるため、1 つの Driver で多数のサブタスクを管理できます。これらのサブタスクは、構造化された同時実行の結果であるツリー構造を作成します。Driver は、効率を向上させるために、サブタスク間で SkyValue ルックアップをバッチ処理します。

Driver を中心に構築されたクラスがいくつかあり、次の API があります。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver は、単一のルート StateMachine をパラメータとして受け取ります。Driver.drive を呼び出すと、Skyframe を再起動せずに StateMachine を実行できるところまで実行します。StateMachine が完了した場合は true を返し、それ以外の場合は false を返します。これは、すべての値が使用可能ではなかったことを示します。

DriverStateMachine の同時実行状態を維持し、SkyKeyComputeState に埋め込むのに適しています。

Driver を直接インスタンス化する

StateMachine の実装では、通常、コールバックを介して結果を伝えます。次の例に示すように、Driver を直接インスタンス化することもできます。

Driver は、対応する ResultSink の実装とともに SkyKeyComputeState 実装に埋め込まれます。ResultSink の実装については、少し後で定義します。最上位レベルでは、State オブジェクトは Driver よりも長生きすることが保証されているため、計算結果の適切な受信者となります。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下のコードは ResultProducer のスケッチです。

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

   // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

遅延計算の結果のコードは次のようになります。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

エンベディング Driver

StateMachine が値を生成し、例外が発生しない場合、次の例に示すように、Driver を埋め込むことも可能です。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
}

SkyFunction には、次のようなコードが含まれている場合があります(StateSkyKeyComputeState の関数固有の型です)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

StateMachine 実装に Driver を埋め込む方が、Skyframe の同期コーディング スタイルに適しています。

例外を生成する可能性のある StateMachine

それ以外の場合は、同期 SkyFunction コードと一致する同期 API を持つ SkyKeyComputeState 埋め込み可能な ValueOrExceptionProducer クラスと ValueOrException2Producer クラスがあります。

ValueOrExceptionProducer 抽象クラスには次のメソッドが含まれています。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
      // Implementation.
  }

  protected final void setValue(V value)  {   // Implementation. }
  protected final void setException(E exception) {   // Implementation. }
}

これには、埋め込み Driver インスタンスが含まれており、埋め込みドライバResultProducer クラスとよく似ており、同様の方法で SkyFunction とインターフェースします。ResultSink を定義する代わりに、実装では、いずれかのイベントが発生したときに setValue または setException を呼び出します。両方が発生した場合は、例外が優先されます。tryProduceValue メソッドは、非同期コールバック コードを同期コードにブリッジし、設定されている場合は例外をスローします。

前述のように、エラー バブリング中に、すべての入力が利用可能になっていないため、マシンがまだ完了していない場合でもエラーが発生する可能性があります。これに対応するため、tryProduceValue はマシンが完了する前であっても、設定された例外をスローします。

エピローグ: コールバックの最終的な削除

StateMachine は、非同期計算を行うための非常に効率的ですが、ボイラープレートを多用する方法です。継続(特に ListenableFuture に渡される Runnable の形式)は Bazel コードの特定の部分で広く使用されていますが、分析 SkyFunction では一般的ではありません。分析は主に CPU バウンドであり、ディスク I/O 用の効率的な非同期 API はありません。最終的には、コールバックは学習曲線があり、可読性を損なうため、最適化して取り除くことをおすすめします。

最も有望な代替手段の 1 つは、Java 仮想スレッドです。コールバックを記述する必要がなくなり、すべてが同期ブロッキング呼び出しに置き換えられます。これは、プラットフォーム スレッドとは異なり、仮想スレッド リソースの関連付けは安価であるためです。ただし、仮想スレッドを使用しても、単純な同期オペレーションをスレッド作成と同期プリミティブに置き換えるのはコストが高すぎます。StateMachines から Java 仮想スレッドへの移行を行ったところ、処理速度が大幅に低下し、エンドツーエンドの分析レイテンシが 3 倍近く増加しました。仮想スレッドはまだプレビュー機能であるため、パフォーマンスが向上したときに移行を実行できる可能性があります。

もう 1 つの方法として、Loom コルーチンが利用可能になった場合に、それを待つことを検討できます。この利点は、協調的マルチタスクを使用することで同期オーバーヘッドを削減できる可能性があることです。

他の方法がすべて失敗した場合は、低レベルのバイトコードの書き換えも有効な代替手段となります。十分に最適化すれば、手書きのコールバック コードに近いパフォーマンスを実現できる可能性があります。

付録

コールバック地獄

コールバック地獄は、コールバックを使用する非同期コードで悪名高い問題です。これは、後続のステップの継続が前のステップ内にネストされているためです。ステップが多い場合、このネストは非常に深くなる可能性があります。制御フローと組み合わせると、コードが管理不能になります。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

ネストされた実装の利点の 1 つは、外側のステップのスタック フレームを保持できることです。Java では、キャプチャされたラムダ変数は実質的に final である必要があるため、このような変数を使用するのは面倒な場合があります。次のように、ラムダの代わりに継続としてメソッド参照を返すことで、深いネストを回避します。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

runAfter インジェクション パターンを過密に使用すると、コールバック地獄が発生することもありますが、インジェクションを順次ステップと組み合わせることで回避できます。

例: SkyValue ルックアップのチェーン

アプリケーション ロジックで SkyValue ルックアップの依存チェーンが必要になることがよくあります。たとえば、2 番目の SkyKey が最初の SkyValue に依存する場合などです。これを単純に考えると、複雑で深くネストされたコールバック構造になります。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

ただし、継続はメソッド参照として指定されているため、コードは状態遷移全体で手続き型に見えます。step2step1 に続きます。ここでは、ラムダを使用して value2 を割り当てています。これにより、コードの順序が計算の順序と上から下へ一致します。

その他のヒント

読みやすさ: 実行順序

可読性を高めるため、StateMachine.step の実装は実行順に、コールバックの実装はコード内で渡される直後に配置するようにしてください。制御フローが分岐する場合、これは常に可能とは限りません。このような場合は、追加のコメントが役立つことがあります。

例: SkyValue ルックアップのチェーンでは、この目的を達成するために中間メソッド参照が作成されています。これにより、読みやすさと引き換えにパフォーマンスが若干低下しますが、ここではその価値があるでしょう。

世代仮説

存続期間が中程度の Java オブジェクトは、存続期間が非常に短いオブジェクトまたは永続的なオブジェクトを処理するように設計された Java ガベージ コレクタの世代仮説を破ります。定義上、SkyKeyComputeState のオブジェクトはこの仮説に違反します。このようなオブジェクトは、Driver をルートとする、実行中のすべての StateMachine の構築済みツリーを含み、非同期計算の完了を待機して一時停止するため、中間的なライフサイクルを持ちます。

JDK19 では問題が軽減されているようですが、StateMachine を使用すると、実際に生成されるガベージが大幅に減少しても、GC 時間が増加することがあります。StateMachine の寿命は中間であるため、古い世代に昇格して、より早くいっぱいになる可能性があります。その結果、クリーンアップに高コストのメジャー GC またはフル GC が必要になります。

最初の予防策は、StateMachine 変数の使用を最小限に抑えることですが、複数の状態にわたって値が必要な場合など、常に実現可能とは限りません。可能な場合、ローカル スタックの step 変数は若い世代の変数であり、効率的に GC されます。

StateMachine 変数の場合は、サブタスクに分割し、StateMachine 間で値を伝播するの推奨パターンに従うことも有効です。このパターンに従うと、子 StateMachine のみが親 StateMachine への参照を持ち、その逆はないことに注意してください。つまり、子フラグメントが結果コールバックを使用して親フラグメントを完了して更新すると、子フラグメントは自然にスコープ外になり、GC の対象になります。

最後に、場合によっては、StateMachine 変数が前の状態では必要だが、後の状態では必要ないことがあります。大きなオブジェクトが不要になったことがわかったら、その参照を null にするとよいでしょう。

状態に名前を付ける

メソッドに名前を付ける場合、通常はそのメソッド内で発生する動作に名前を付けることができます。StateMachine にはスタックがないため、この方法をどのように行うかは明確ではありません。たとえば、メソッド foo がサブメソッド bar を呼び出すとします。StateMachine では、これは foo の状態シーケンスに変換され、その後に bar が続きます。foobar の動作が含まれなくなりました。そのため、状態のメソッド名はスコープが狭くなる傾向があり、ローカルな動作を反映している可能性があります。

同時実行の樹形図

次は、構造化された同時実行の図の別のビューで、ツリー構造をより適切に示しています。ブロックが小さな木を形成します。

構造化された同時実行 3D


  1. 値が使用できない場合に最初から再起動するという Skyframe の慣例とは対照的です。 

  2. stepInterruptedException をスローできますが、例では省略しています。Bazel コードには、この例外をスローするメソッドがいくつかあり、後述する StateMachine を実行する Driver まで伝播します。不要な場合は、スローされることを宣言しなくてもかまいません。 

  3. 同時実行サブタスクは、各依存関係に対して独立した作業を行う ConfiguredTargetFunction によって実現されます。すべての依存関係を一度に処理する複雑なデータ構造を操作して非効率性を導入するのではなく、各依存関係に独自の独立した StateMachine があります。 

  4. 1 つのステップ内の複数の tasks.lookUp 呼び出しはバッチ処理されます。同時実行サブタスク内で発生するルックアップによって、追加のバッチ処理を作成できます。 

  5. これは、Java の構造化された並行処理 jeps/428 と概念的に似ています。 

  6. これは、スレッドを生成して結合し、順次構成を実現するのと似ています。