概要
Skyframe の StateMachine
は、ヒープに存在する分解された関数オブジェクトです。必要な値がすぐに使用可能ではなく、非同期で計算される場合、冗長性なしで柔軟な評価をサポートします。1StateMachine
は待機中にスレッド リソースを占有することはできません。代わりに、一時停止して再開する必要があります。このように分解することで、明示的な再エントリ ポイントが公開され、以前の計算をスキップできるようになります。
StateMachine
は、シーケンス、分岐、構造化された論理的並行性を表現するために使用でき、Skyframe インタラクション専用に調整されています。StateMachine
は、より大きな StateMachine
に構成して、サブ StateMachine
を共有できます。同時実行は常に階層構造であり、純粋に論理的です。すべての同時実行サブタスクは、単一の共有親 SkyFunction スレッドで実行されます。
はじめに
このセクションでは、java.com.google.devtools.build.skyframe.state
パッケージにある StateMachine
の動機と概要を簡単に説明します。
Skyframe の再起動の概要
Skyframe は、依存関係グラフの並列評価を実行するフレームワークです。グラフの各ノードは、パラメータを指定する SkyKey と結果を指定する SkyValue を使用した SkyFunction の評価に対応しています。計算モデルでは、SkyFunction が SkyKey で SkyValue をルックアップし、追加の SkyFunction の再帰的並列評価をトリガーします。スレッドを占有するブロッキングの代わりに、計算のサブグラフが不完全なため、リクエストされた SkyValue がまだ準備できていない場合、リクエスト元の SkyFunction は null
getValue
レスポンスを監視し、SkyValue ではなく null
を返す必要があります。これは、入力が不足しているため不完全であることを示します。Skyframe は、以前にリクエストされたすべての SkyValue が利用可能になると、SkyFunctions を再起動します。
SkyKeyComputeState
の導入前は、再起動を処理する従来の方法では、計算を完全に再実行していました。これは 2 次の複雑性がありますが、この方法で記述された関数は最終的に完了します。これは、再実行ごとに null
を返すルックアップの数が減るためです。SkyKeyComputeState
を使用すると、手動で指定したチェックポイント データを SkyFunction に関連付けることができ、再計算を大幅に削減できます。
StateMachine
は SkyKeyComputeState
内に存在するオブジェクトで、実行の一時停止と再開のフックを公開することで、SkyFunction が再起動したときに(SkyKeyComputeState
がキャッシュから外れないと仮定して)再計算をほぼすべて排除します。
SkyKeyComputeState
内のステートフル コンピューティング
オブジェクト指向設計の観点から、純粋なデータ値ではなく、計算オブジェクトを SkyKeyComputeState
内に保存することを検討するのは理にかなっています。Java では、動作を伴うオブジェクトの最小限の説明は関数型インターフェースであり、これで十分です。StateMachine
には、次の再帰的な定義があります。2
@FunctionalInterface
public interface StateMachine {
StateMachine step(Tasks tasks) throws InterruptedException;
}
Tasks
インターフェースは SkyFunction.Environment
と似ていますが、非同期用に設計されており、論理的に同時実行されるサブタスクのサポートが追加されています3。
step
の戻り値は別の StateMachine
であり、ステップのシーケンスを帰納的に指定できます。StateMachine
が完了すると、step
は DONE
を返します。次に例を示します。
class HelloWorld implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
System.out.println("hello");
return this::step2; // The next step is HelloWorld.step2.
}
private StateMachine step2(Tasks tasks) {
System.out.println("world");
// DONE is special value defined in the `StateMachine` interface signaling
// that the computation is done.
return DONE;
}
}
次の出力で StateMachine
を記述します。
hello
world
step2
が StateMachine
の関数型インターフェース定義を満たしているため、メソッド参照 this::step2
も StateMachine
であることに注意してください。メソッド参照は、StateMachine
で次の状態を指定する最も一般的な方法です。
直感的に、計算をモノリシック関数ではなく StateMachine
ステップに分割すると、計算を一時停止して再開するために必要なフックが提供されます。StateMachine.step
が戻ると、明示的な一時停止ポイントがあります。返された StateMachine
値で指定された継続は、明示的な 再開ポイントです。計算を中断したところから正確に再開できるため、再計算を回避できます。
コールバック、継続、非同期計算
技術的には、StateMachine
は継続として機能し、実行する後続の計算を決定します。StateMachine
は、ブロックするのではなく、step
関数から戻ることで自発的に 一時停止 し、制御を Driver
インスタンスに戻します。Driver
は、準備完了の StateMachine
に切り替えるか、制御を Skyframe に戻すことができます。
従来、コールバックと継続は 1 つの概念に統合されていました。ただし、StateMachine
はこの 2 つを区別します。
- コールバック - 非同期計算の結果を保存する場所を記述します。
- 継続 - 次の実行状態を指定します。
非同期オペレーションを呼び出す場合はコールバックが必要です。つまり、SkyValue のルックアップの場合のように、メソッドを呼び出した直後に実際のオペレーションは発生しません。コールバックはできるだけシンプルに保つ必要があります。
継続は StateMachine
の StateMachine
戻り値であり、すべての非同期計算が解決した後に続く複雑な実行をカプセル化します。この構造化されたアプローチにより、コールバックの複雑さを管理しやすくなります。
タスク
Tasks
インターフェースは、SkyKey で SkyValue を検索し、同時実行サブタスクをスケジュールする API を StateMachine
に提供します。
interface Tasks {
void enqueue(StateMachine subtask);
void lookUp(SkyKey key, Consumer<SkyValue> sink);
<E extends Exception>
void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);
// lookUp overloads for 2 and 3 exception types exist, but are elided here.
}
SkyValue ルックアップ
StateMachine
は Tasks.lookUp
オーバーロードを使用して SkyValue を検索します。これらは SkyFunction.Environment.getValue
および SkyFunction.Environment.getValueOrThrow
に類似しており、同様の例外処理セマンティクスを持ちます。実装では、ルックアップはすぐには実行されず、可能な限り多くのルックアップがバッチ処理されてから実行されます。値がすぐに利用できない場合(Skyframe の再起動が必要な場合など)があるため、呼び出し元は、コールバックを使用して結果の値を処理する方法を指定します。
StateMachine
プロセッサ(Driver
と SkyFrame へのブリッジ)は、次の状態が始まる前に値が利用可能であることを保証します。以下の例を
class DoesLookup implements StateMachine, Consumer<SkyValue> {
private Value value;
@Override
public StateMachine step(Tasks tasks) {
tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
return this::processValue;
}
// The `lookUp` call in `step` causes this to be called before `processValue`.
@Override // Implementation of Consumer<SkyValue>.
public void accept(SkyValue value) {
this.value = (Value)value;
}
private StateMachine processValue(Tasks tasks) {
System.out.println(value); // Prints the string representation of `value`.
return DONE;
}
}
上記の例では、最初のステップで new Key()
のルックアップを行い、コンシューマーとして this
を渡します。これは、DoesLookup
が Consumer<SkyValue>
を実装しているためです。
契約により、次の状態 DoesLookup.processValue
が開始される前に、DoesLookup.step
のすべてのルックアップが完了します。したがって、processValue
でアクセスされると value
が利用可能になります。
サブタスク
Tasks.enqueue
は、論理的に同時実行されるサブタスクの実行をリクエストします。サブタスクも StateMachine
であり、通常の StateMachine
ができることはすべて実行できます。たとえば、サブタスクを再帰的に作成したり、SkyValues を検索したりできます。lookUp
と同様に、ステート マシン ドライバは、次のステップに進む前にすべてのサブタスクが完了していることを確認します。以下の例を
class Subtasks implements StateMachine {
private int i = 0;
@Override
public StateMachine step(Tasks tasks) {
tasks.enqueue(new Subtask1());
tasks.enqueue(new Subtask2());
// The next step is Subtasks.processResults. It won't be called until both
// Subtask1 and Subtask 2 are complete.
return this::processResults;
}
private StateMachine processResults(Tasks tasks) {
System.out.println(i); // Prints "3".
return DONE; // Subtasks is done.
}
private class Subtask1 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
i += 1;
return DONE; // Subtask1 is done.
}
}
private class Subtask2 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
i += 2;
return DONE; // Subtask2 is done.
}
}
}
Subtask1
と Subtask2
は論理的には同時実行ですが、すべてが単一のスレッドで実行されるため、i
の「同時」更新には同期は必要ありません。
構造化された同時実行
すべての lookUp
と enqueue
は次の状態に進む前に解決する必要があるため、並行処理は自然にツリー構造に制限されます。次の例に示すように、階層型5同時実行を作成できます。
UML から、並行処理構造がツリーを形成していることを判断するのは困難です。ツリー構造をより適切に表示する代替ビューがあります。
構造化された同時実行は、推論がはるかに容易です。
構成と制御フロー パターン
このセクションでは、複数の StateMachine
を構成する方法の例と、特定の制御フローの問題の解決策を示します。
順次状態
これは、最も一般的でわかりやすい制御フロー パターンです。例については、SkyKeyComputeState
内のステートフルな計算をご覧ください。
分岐
StateMachine
の分岐状態は、次の例に示すように、通常の Java 制御フローを使用して異なる値を返すことで実現できます。
class Branch implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
// Returns different state machines, depending on condition.
if (shouldUseA()) {
return this::performA;
}
return this::performB;
}
…
}
特定のブランチが早期完了のために DONE
を返すことはよくあります。
高度なシーケンシャル合成
StateMachine
制御構造はメモリレスであるため、StateMachine
定義をサブタスクとして共有すると、不自然になることがあります。M1 と M2 を StateMachine
を共有する StateMachine
インスタンスとし、M1 と M2 をそれぞれシーケンス <A, S, B> と <X, S, Y> とします。問題は、S が完了後に B または Y のどちらを続行すべきかわからないことと、StateMachine
がコールスタックを完全に保持しないことです。このセクションでは、これを実現するための手法をいくつか紹介します。
StateMachine
を終了シーケンス要素として使用
この方法では、最初に提示された問題は解決しません。共有 StateMachine
がシーケンスの終端である場合にのみ、シーケンシャル コンポジションを示します。
// S is the shared state machine.
class S implements StateMachine { … }
class M1 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performA();
return new S();
}
}
class M2 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performX();
return new S();
}
}
これは、S 自体が複雑な状態マシンである場合でも機能します。
順次コンポジションのサブタスク
キューに登録されたサブタスクは次の状態の前に完了することが保証されているため、サブタスク メカニズムを少しだけ悪用できる場合があります。6
class M1 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performA();
// S starts after `step` returns and by contract must complete before `doB`
// begins. It is effectively sequential, inducing the sequence < A, S, B >.
tasks.enqueue(new S());
return this::doB;
}
private StateMachine doB(Tasks tasks) {
performB();
return DONE;
}
}
class M2 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performX();
// Similarly, this induces the sequence < X, S, Y>.
tasks.enqueue(new S());
return this::doY;
}
private StateMachine doY(Tasks tasks) {
performY();
return DONE;
}
}
runAfter
インジェクション
S の実行前に完了する必要がある他の並列サブタスクや Tasks.lookUp
呼び出しがあるため、Tasks.enqueue
を悪用できない場合があります。この場合、runAfter
パラメータを S に挿入して、次の処理を S に通知できます。
class S implements StateMachine {
// Specifies what to run after S completes.
private final StateMachine runAfter;
@Override
public StateMachine step(Tasks tasks) {
… // Performs some computations.
return this::processResults;
}
@Nullable
private StateMachine processResults(Tasks tasks) {
… // Does some additional processing.
// Executes the state machine defined by `runAfter` after S completes.
return runAfter;
}
}
class M1 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performA();
// Passes `this::doB` as the `runAfter` parameter of S, resulting in the
// sequence < A, S, B >.
return new S(/* runAfter= */ this::doB);
}
private StateMachine doB(Tasks tasks) {
performB();
return DONE;
}
}
class M2 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performX();
// Passes `this::doY` as the `runAfter` parameter of S, resulting in the
// sequence < X, S, Y >.
return new S(/* runAfter= */ this::doY);
}
private StateMachine doY(Tasks tasks) {
performY();
return DONE;
}
}
このアプローチは、サブタスクを濫用するよりもクリーンです。ただし、複数の StateMachine
を runAfter
でネストするなど、この方法を過度に適用すると、コールバック地獄に陥ります。代わりに、通常のシーケンシャル状態を使用してシーケンシャル runAfter
を分割することをおすすめします。
return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))
に置き換えることができます。
private StateMachine step1(Tasks tasks) {
doStep1();
return new S(/* runAfter= */ this::intermediateStep);
}
private StateMachine intermediateStep(Tasks tasks) {
return new T(/* runAfter= */ this::nextStep);
}
禁止の代替: runAfterUnlessError
以前のドラフトでは、エラーが発生するとすぐに中止する runAfterUnlessError
を検討していました。これは、エラーが 2 回チェックされることが多いという事実に基づいています。1 回は runAfter
参照を持つ StateMachine
によって、もう 1 回は runAfter
マシン自体によってチェックされます。
検討の結果、エラーチェックの重複排除よりもコードの均一性が重要であると判断しました。runAfter
メカニズムが tasks.enqueue
メカニズムと一貫した方法で動作しないと、混乱を招く可能性があります。tasks.enqueue
メカニズムでは常にエラー チェックが必要です。
直接委任
正式な状態遷移が発生するたびに、メインの Driver
ループが進みます。契約に基づき、状態を進めることは、次の状態が実行される前に、以前にキューに登録されたすべての SkyValue ルックアップとサブタスクが解決されることを意味します。デリゲート StateMachine
のロジックにより、位相の進みが不要になったり、逆効果になったりすることがあります。たとえば、委任の最初の step
が委任状態のルックアップと並列化できる SkyKey ルックアップを実行する場合、フェーズの進行によりそれらが順次実行されます。次の例に示すように、直接委任を行う方が理にかなっている場合があります。
class Parent implements StateMachine {
@Override
public StateMachine step(Tasks tasks ) {
tasks.lookUp(new Key1(), this);
// Directly delegates to `Delegate`.
//
// The (valid) alternative:
// return new Delegate(this::afterDelegation);
// would cause `Delegate.step` to execute after `step` completes which would
// cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
return new Delegate(this::afterDelegation).step(tasks);
}
private StateMachine afterDelegation(Tasks tasks) {
…
}
}
class Delegate implements StateMachine {
private final StateMachine runAfter;
Delegate(StateMachine runAfter) {
this.runAfter = runAfter;
}
@Override
public StateMachine step(Tasks tasks) {
tasks.lookUp(new Key2(), this);
return …;
}
// Rest of implementation.
…
private StateMachine complete(Tasks tasks) {
…
return runAfter;
}
}
データフロー
これまでの説明では、制御フローの管理に重点を置いてきました。このセクションでは、データ値の伝播について説明します。
Tasks.lookUp
コールバックの実装
SkyValue ルックアップに Tasks.lookUp
コールバックの実装例があります。このセクションでは、複数の SkyValue を処理するための根拠とアプローチについて説明します。
Tasks.lookUp
個のコールバック
Tasks.lookUp
メソッドは、コールバック sink
をパラメータとして受け取ります。
void lookUp(SkyKey key, Consumer<SkyValue> sink);
慣用的なアプローチは、Java ラムダを使用してこれを実装することです。
tasks.lookUp(key, value -> myValue = (MyValueClass)value);
ここで、myValue
はルックアップを行う StateMachine
インスタンスのメンバー変数です。ただし、ラムダでは、StateMachine
実装で Consumer<SkyValue>
インターフェースを実装する場合と比較して、追加のメモリ割り当てが必要です。あいまいになる可能性のある複数のルックアップがある場合でも、ラムダは便利です。
Tasks.lookUp
には、SkyFunction.Environment.getValueOrThrow
と同様のエラー処理オーバーロードもあります。
<E extends Exception> void lookUp(
SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);
interface ValueOrExceptionSink<E extends Exception> {
void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
}
以下に実装例を示します。
class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
private MyValue value;
private MyException error;
@Override
public StateMachine step(Tasks tasks) {
tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
return this::processResult;
}
@Override
public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
if (value != null) {
this.value = (MyValue)value;
return;
}
if (exception != null) {
this.error = exception;
return;
}
throw new IllegalArgumentException("Both parameters were unexpectedly null.");
}
private StateMachine processResult(Tasks tasks) {
if (exception != null) {
// Handles the error.
…
return DONE;
}
// Processes `value`, which is non-null.
…
}
}
エラー処理のないルックアップと同様に、StateMachine
クラスにコールバックを直接実装すると、ラムダのメモリ割り当てが節約されます。
エラー処理で詳細を確認できますが、基本的には、エラーと通常の値の伝播に大きな違いはありません。
複数の SkyValue を使用する
多くの場合、複数の SkyValue ルックアップが必要です。多くの場合、SkyValue の型を切り替える方法が有効です。次の例は、プロトタイプの本番環境コードを簡略化したものです。
@Nullable
private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
var configurationKey = configuredTarget.getConfigurationKey();
if (configurationKey != null) {
tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
}
var packageId = configuredTarget.getLabel().getPackageIdentifier();
tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);
return this::constructResult;
}
@Override // Implementation of `Consumer<SkyValue>`.
public void accept(SkyValue value) {
if (value instanceof BuildConfigurationValue) {
this.configurationValue = (BuildConfigurationValue) value;
return;
}
if (value instanceof PackageValue) {
this.pkg = ((PackageValue) value).getPackage();
return;
}
throw new IllegalArgumentException("unexpected value: " + value);
}
値の型が異なるため、Consumer<SkyValue>
コールバック実装は明確に共有できます。そうでない場合は、ラムダベースの実装や、適切なコールバックを実装する完全な内部クラス インスタンスにフォールバックすることが可能です。
StateMachine
間の値の伝播
これまでのところ、このドキュメントではサブタスクで作業を整理する方法のみを説明してきましたが、サブタスクは呼び出し元に値をレポートする必要もあります。サブタスクは論理的に非同期であるため、その結果はコールバックを使用して呼び出し元に返されます。これを実現するために、サブタスクはコンストラクタを介して挿入されるシンク インターフェースを定義します。
class BarProducer implements StateMachine {
// Callers of BarProducer implement the following interface to accept its
// results. Exactly one of the two methods will be called by the time
// BarProducer completes.
interface ResultSink {
void acceptBarValue(Bar value);
void acceptBarError(BarException exception);
}
private final ResultSink sink;
BarProducer(ResultSink sink) {
this.sink = sink;
}
… // StateMachine steps that end with this::complete.
private StateMachine complete(Tasks tasks) {
if (hasError()) {
sink.acceptBarError(getError());
return DONE;
}
sink.acceptBarValue(getValue());
return DONE;
}
}
呼び出し元 StateMachine
は次のようになります。
class Caller implements StateMachine, BarProducer.ResultSink {
interface ResultSink {
void acceptCallerValue(Bar value);
void acceptCallerError(BarException error);
}
private final ResultSink sink;
private Bar value;
Caller(ResultSink sink) {
this.sink = sink;
}
@Override
@Nullable
public StateMachine step(Tasks tasks) {
tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
return this::processResult;
}
@Override
public void acceptBarValue(Bar value) {
this.value = value;
}
@Override
public void acceptBarError(BarException error) {
sink.acceptCallerError(error);
}
private StateMachine processResult(Tasks tasks) {
// Since all enqueued subtasks resolve before `processResult` starts, one of
// the `BarResultSink` callbacks must have been called by this point.
if (value == null) {
return DONE; // There was a previously reported error.
}
var finalResult = computeResult(value);
sink.acceptCallerValue(finalResult);
return DONE;
}
}
上記の例では、次のことを示しています。Caller
は、結果を伝播して独自の Caller.ResultSink
を定義する必要があります。Caller
は BarProducer.ResultSink
コールバックを実装します。再開時に、processResult
は value
が null かどうかをチェックして、エラーが発生したかどうかを判断します。これは、サブタスクまたは SkyValue ルックアップのいずれかから出力を受け取った後の一般的な動作パターンです。
acceptBarError
の実装は、エラー バブリングで要求されているように、結果を Caller.ResultSink
に積極的に転送することに注意してください。
トップレベルの StateMachine
の代替については、Driver
と SkyFunctions へのブリッジで説明しています。
エラー処理
Tasks.lookUp
コールバックと StateMachines
間での値の伝播には、すでにエラー処理の例がいくつかあります。InterruptedException
以外の例外はスローされず、値としてコールバックを介して渡されます。このようなコールバックは、値またはエラーのいずれか 1 つだけが渡される排他的論理和セマンティクスを持つことがよくあります。
次のセクションでは、Skyframe のエラー処理との微妙ながらも重要なやり取りについて説明します。
エラー バブリング(--nokeep_going)
エラー バブリング中、リクエストされたすべての SkyValue が利用可能でなくても、SkyFunction が再起動されることがあります。このような場合、Tasks
API コントラクトにより、後続の状態に到達することはありません。ただし、StateMachine
は例外を伝播する必要があります。
伝播は次の状態に達したかどうかに関係なく発生する必要があるため、エラー処理コールバックはこのタスクを実行する必要があります。内部 StateMachine
の場合、これは親コールバックを呼び出すことで実現されます。
SkyFunction とインターフェースするトップレベルの StateMachine
では、ValueOrExceptionProducer
の setException
メソッドを呼び出すことで、これを行うことができます。ValueOrExceptionProducer.tryProduceValue
は、SkyValue が欠落している場合でも例外をスローします。
Driver
が直接使用されている場合は、マシンが処理を完了していなくても、SkyFunction から伝播されたエラーを確認することが重要です。
イベント処理
イベントを生成する必要がある SkyFunction の場合、StoredEventHandler
が SkyKeyComputeState に挿入され、さらにそれを必要とする StateMachine
に挿入されます。以前は、Skyframe が特定のイベントを再生しない限りドロップするため、StoredEventHandler
が必要でしたが、その後修正されました。StoredEventHandler
インジェクションは、エラー処理コールバックから発行されるイベントの実装を簡素化するため、保持されます。
Driver
と SkyFunctions へのブリッジ
Driver
は、指定されたルート StateMachine
から始まる StateMachine
の実行を管理します。StateMachine
はサブタスク StateMachine
を再帰的にキューに追加できるため、1 つの Driver
で多数のサブタスクを管理できます。これらのサブタスクは、構造化された同時実行の結果であるツリー構造を作成します。Driver
は、効率を向上させるために、サブタスク間で SkyValue ルックアップをバッチ処理します。
Driver
を中心に構築されたクラスがいくつかあり、次の API があります。
public final class Driver {
public Driver(StateMachine root);
public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}
Driver
は、単一のルート StateMachine
をパラメータとして受け取ります。Driver.drive
を呼び出すと、Skyframe を再起動せずに StateMachine
を実行できるところまで実行します。StateMachine
が完了した場合は true を返し、それ以外の場合は false を返します。これは、すべての値が使用可能ではなかったことを示します。
Driver
は StateMachine
の同時実行状態を維持し、SkyKeyComputeState
に埋め込むのに適しています。
Driver
を直接インスタンス化する
StateMachine
の実装では、通常、コールバックを介して結果を伝えます。次の例に示すように、Driver
を直接インスタンス化することもできます。
Driver
は、対応する ResultSink
の実装とともに SkyKeyComputeState
実装に埋め込まれます。ResultSink
の実装については、少し後で定義します。最上位レベルでは、State
オブジェクトは Driver
よりも長生きすることが保証されているため、計算結果の適切な受信者となります。
class State implements SkyKeyComputeState, ResultProducer.ResultSink {
// The `Driver` instance, containing the full tree of all `StateMachine`
// states. Responsible for calling `StateMachine.step` implementations when
// asynchronous values are available and performing batched SkyFrame lookups.
//
// Non-null while `result` is being computed.
private Driver resultProducer;
// Variable for storing the result of the `StateMachine`
//
// Will be non-null after the computation completes.
//
private ResultType result;
// Implements `ResultProducer.ResultSink`.
//
// `ResultProducer` propagates its final value through a callback that is
// implemented here.
@Override
public void acceptResult(ResultType result) {
this.result = result;
}
}
以下のコードは ResultProducer
のスケッチです。
class ResultProducer implements StateMachine {
interface ResultSink {
void acceptResult(ResultType value);
}
private final Parameters parameters;
private final ResultSink sink;
… // Other internal state.
ResultProducer(Parameters parameters, ResultSink sink) {
this.parameters = parameters;
this.sink = sink;
}
@Override
public StateMachine step(Tasks tasks) {
… // Implementation.
return this::complete;
}
private StateMachine complete(Tasks tasks) {
sink.acceptResult(getResult());
return DONE;
}
}
遅延計算の結果のコードは次のようになります。
@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
throws InterruptedException {
if (state.result != null) {
return state.result;
}
if (state.resultProducer == null) {
state.resultProducer = new Driver(new ResultProducer(
new Parameters(), (ResultProducer.ResultSink)state));
}
if (state.resultProducer.drive(env)) {
// Clears the `Driver` instance as it is no longer needed.
state.resultProducer = null;
}
return state.result;
}
エンベディング Driver
StateMachine
が値を生成し、例外が発生しない場合、次の例に示すように、Driver
を埋め込むことも可能です。
class ResultProducer implements StateMachine {
private final Parameters parameters;
private final Driver driver;
private ResultType result;
ResultProducer(Parameters parameters) {
this.parameters = parameters;
this.driver = new Driver(this);
}
@Nullable // Null when a Skyframe restart is needed.
public ResultType tryProduceValue( SkyFunction.Environment env)
throws InterruptedException {
if (!driver.drive(env)) {
return null;
}
return result;
}
@Override
public StateMachine step(Tasks tasks) {
… // Implementation.
}
SkyFunction には、次のようなコードが含まれている場合があります(State
は SkyKeyComputeState
の関数固有の型です)。
@Nullable // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
throws InterruptedException {
if (state.result != null) {
return state.result;
}
if (state.resultProducer == null) {
state.resultProducer = new ResultProducer(new Parameters());
}
var result = state.resultProducer.tryProduceValue(env);
if (result == null) {
return null;
}
state.resultProducer = null;
return state.result = result;
}
StateMachine
実装に Driver
を埋め込む方が、Skyframe の同期コーディング スタイルに適しています。
例外を生成する可能性のある StateMachine
それ以外の場合は、同期 SkyFunction コードと一致する同期 API を持つ SkyKeyComputeState
埋め込み可能な ValueOrExceptionProducer
クラスと ValueOrException2Producer
クラスがあります。
ValueOrExceptionProducer
抽象クラスには次のメソッドが含まれています。
public abstract class ValueOrExceptionProducer<V, E extends Exception>
implements StateMachine {
@Nullable
public final V tryProduceValue(Environment env)
throws InterruptedException, E {
… // Implementation.
}
protected final void setValue(V value) { … // Implementation. }
protected final void setException(E exception) { … // Implementation. }
}
これには、埋め込み Driver
インスタンスが含まれており、埋め込みドライバの ResultProducer
クラスとよく似ており、同様の方法で SkyFunction とインターフェースします。ResultSink
を定義する代わりに、実装では、いずれかのイベントが発生したときに setValue
または setException
を呼び出します。両方が発生した場合は、例外が優先されます。tryProduceValue
メソッドは、非同期コールバック コードを同期コードにブリッジし、設定されている場合は例外をスローします。
前述のように、エラー バブリング中に、すべての入力が利用可能になっていないため、マシンがまだ完了していない場合でもエラーが発生する可能性があります。これに対応するため、tryProduceValue
はマシンが完了する前であっても、設定された例外をスローします。
エピローグ: コールバックの最終的な削除
StateMachine
は、非同期計算を行うための非常に効率的ですが、ボイラープレートを多用する方法です。継続(特に ListenableFuture
に渡される Runnable
の形式)は Bazel コードの特定の部分で広く使用されていますが、分析 SkyFunction では一般的ではありません。分析は主に CPU バウンドであり、ディスク I/O 用の効率的な非同期 API はありません。最終的には、コールバックは学習曲線があり、可読性を損なうため、最適化して取り除くことをおすすめします。
最も有望な代替手段の 1 つは、Java 仮想スレッドです。コールバックを記述する必要がなくなり、すべてが同期ブロッキング呼び出しに置き換えられます。これは、プラットフォーム スレッドとは異なり、仮想スレッド リソースの関連付けは安価であるためです。ただし、仮想スレッドを使用しても、単純な同期オペレーションをスレッド作成と同期プリミティブに置き換えるのはコストが高すぎます。StateMachine
s から Java 仮想スレッドへの移行を行ったところ、処理速度が大幅に低下し、エンドツーエンドの分析レイテンシが 3 倍近く増加しました。仮想スレッドはまだプレビュー機能であるため、パフォーマンスが向上したときに移行を実行できる可能性があります。
もう 1 つの方法として、Loom コルーチンが利用可能になった場合に、それを待つことを検討できます。この利点は、協調的マルチタスクを使用することで同期オーバーヘッドを削減できる可能性があることです。
他の方法がすべて失敗した場合は、低レベルのバイトコードの書き換えも有効な代替手段となります。十分に最適化すれば、手書きのコールバック コードに近いパフォーマンスを実現できる可能性があります。
付録
コールバック地獄
コールバック地獄は、コールバックを使用する非同期コードで悪名高い問題です。これは、後続のステップの継続が前のステップ内にネストされているためです。ステップが多い場合、このネストは非常に深くなる可能性があります。制御フローと組み合わせると、コードが管理不能になります。
class CallbackHell implements StateMachine {
@Override
public StateMachine step(Tasks task) {
doA();
return (t, l) -> {
doB();
return (t1, l2) -> {
doC();
return DONE;
};
};
}
}
ネストされた実装の利点の 1 つは、外側のステップのスタック フレームを保持できることです。Java では、キャプチャされたラムダ変数は実質的に final である必要があるため、このような変数を使用するのは面倒な場合があります。次のように、ラムダの代わりに継続としてメソッド参照を返すことで、深いネストを回避します。
class CallbackHellAvoided implements StateMachine {
@Override
public StateMachine step(Tasks task) {
doA();
return this::step2;
}
private StateMachine step2(Tasks tasks) {
doB();
return this::step3;
}
private StateMachine step3(Tasks tasks) {
doC();
return DONE;
}
}
runAfter
インジェクション パターンを過密に使用すると、コールバック地獄が発生することもありますが、インジェクションを順次ステップと組み合わせることで回避できます。
例: SkyValue ルックアップのチェーン
アプリケーション ロジックで SkyValue ルックアップの依存チェーンが必要になることがよくあります。たとえば、2 番目の SkyKey が最初の SkyValue に依存する場合などです。これを単純に考えると、複雑で深くネストされたコールバック構造になります。
private ValueType1 value1;
private ValueType2 value2;
private StateMachine step1(...) {
tasks.lookUp(key1, (Consumer<SkyValue>) this); // key1 has type KeyType1.
return this::step2;
}
@Override
public void accept(SkyValue value) {
this.value1 = (ValueType1) value;
}
private StateMachine step2(...) {
KeyType2 key2 = computeKey(value1);
tasks.lookup(key2, this::acceptValueType2);
return this::step3;
}
private void acceptValueType2(SkyValue value) {
this.value2 = (ValueType2) value;
}
ただし、継続はメソッド参照として指定されているため、コードは状態遷移全体で手続き型に見えます。step2
は step1
に続きます。ここでは、ラムダを使用して value2
を割り当てています。これにより、コードの順序が計算の順序と上から下へ一致します。
その他のヒント
読みやすさ: 実行順序
可読性を高めるため、StateMachine.step
の実装は実行順に、コールバックの実装はコード内で渡される直後に配置するようにしてください。制御フローが分岐する場合、これは常に可能とは限りません。このような場合は、追加のコメントが役立つことがあります。
例: SkyValue ルックアップのチェーンでは、この目的を達成するために中間メソッド参照が作成されています。これにより、読みやすさと引き換えにパフォーマンスが若干低下しますが、ここではその価値があるでしょう。
世代仮説
存続期間が中程度の Java オブジェクトは、存続期間が非常に短いオブジェクトまたは永続的なオブジェクトを処理するように設計された Java ガベージ コレクタの世代仮説を破ります。定義上、SkyKeyComputeState
のオブジェクトはこの仮説に違反します。このようなオブジェクトは、Driver
をルートとする、実行中のすべての StateMachine
の構築済みツリーを含み、非同期計算の完了を待機して一時停止するため、中間的なライフサイクルを持ちます。
JDK19 では問題が軽減されているようですが、StateMachine
を使用すると、実際に生成されるガベージが大幅に減少しても、GC 時間が増加することがあります。StateMachine
の寿命は中間であるため、古い世代に昇格して、より早くいっぱいになる可能性があります。その結果、クリーンアップに高コストのメジャー GC またはフル GC が必要になります。
最初の予防策は、StateMachine
変数の使用を最小限に抑えることですが、複数の状態にわたって値が必要な場合など、常に実現可能とは限りません。可能な場合、ローカル スタックの step
変数は若い世代の変数であり、効率的に GC されます。
StateMachine
変数の場合は、サブタスクに分割し、StateMachine
間で値を伝播するの推奨パターンに従うことも有効です。このパターンに従うと、子 StateMachine
のみが親 StateMachine
への参照を持ち、その逆はないことに注意してください。つまり、子フラグメントが結果コールバックを使用して親フラグメントを完了して更新すると、子フラグメントは自然にスコープ外になり、GC の対象になります。
最後に、場合によっては、StateMachine
変数が前の状態では必要だが、後の状態では必要ないことがあります。大きなオブジェクトが不要になったことがわかったら、その参照を null にするとよいでしょう。
状態に名前を付ける
メソッドに名前を付ける場合、通常はそのメソッド内で発生する動作に名前を付けることができます。StateMachine
にはスタックがないため、この方法をどのように行うかは明確ではありません。たとえば、メソッド foo
がサブメソッド bar
を呼び出すとします。StateMachine
では、これは foo
の状態シーケンスに変換され、その後に bar
が続きます。foo
に bar
の動作が含まれなくなりました。そのため、状態のメソッド名はスコープが狭くなる傾向があり、ローカルな動作を反映している可能性があります。
同時実行の樹形図
次は、構造化された同時実行の図の別のビューで、ツリー構造をより適切に示しています。ブロックが小さな木を形成します。
-
値が使用できない場合に最初から再起動するという Skyframe の慣例とは対照的です。 ↩
-
step
はInterruptedException
をスローできますが、例では省略しています。Bazel コードには、この例外をスローするメソッドがいくつかあり、後述するStateMachine
を実行するDriver
まで伝播します。不要な場合は、スローされることを宣言しなくてもかまいません。 ↩ -
同時実行サブタスクは、各依存関係に対して独立した作業を行う
ConfiguredTargetFunction
によって実現されます。すべての依存関係を一度に処理する複雑なデータ構造を操作して非効率性を導入するのではなく、各依存関係に独自の独立したStateMachine
があります。 ↩ -
1 つのステップ内の複数の
tasks.lookUp
呼び出しはバッチ処理されます。同時実行サブタスク内で発生するルックアップによって、追加のバッチ処理を作成できます。 ↩ -
これは、スレッドを生成して結合し、順次構成を実現するのと似ています。 ↩