SkyFrame StateMachines 指南

回報問題 查看來源 Nightly · 8.3 · 8.2 · 8.1 · 8.0 · 7.6

總覽

Skyframe StateMachine 是位於堆積上的「解構」函式物件。如果必要值無法立即取得,但會以非同步方式計算,則支援彈性評估,且不會出現多餘的1StateMachine 無法在等待時綁定執行緒資源,而是必須暫停及繼續。因此,解構會公開明確的重新進入點,以便略過先前的運算。

StateMachine 可用於表示序列、分支、結構化邏輯並行,且專為 Skyframe 互動量身打造。StateMachine 可組成較大的 StateMachine,並共用子 StateMachine。並行作業一律會依建構方式分層,且完全是邏輯上的。每個並行子工作都會在單一共用父項 SkyFunction 執行緒中執行。

簡介

本節會簡要說明 StateMachine 的動機和簡介,這些 StateMachine 位於 java.com.google.devtools.build.skyframe.state 套件中。

Skyframe 重新啟動簡介

Skyframe 是一個架構,可平行評估依附元件圖表。圖表中的每個節點都會對應至 SkyFunction 的評估結果,其中 SkyKey 會指定參數,而 SkyValue 則會指定結果。計算模型是這樣:SkyFunction 可以透過 SkyKey 查閱 SkyValue,觸發其他 SkyFunction 的遞迴平行評估。如果計算的某個子圖不完整,導致要求的 SkyValue 尚未準備就緒,要求 SkyFunction 會觀察 null getValue 回應,並傳回 null (而非 SkyValue),表示因缺少輸入內容而不完整。這樣做不會封鎖執行緒。當所有先前要求的 SkyValue 都可用時,Skyframe 會重新啟動 SkyFunctions。

在推出 SkyKeyComputeState 之前,處理重新啟動的傳統方式是完全重新執行運算。雖然這樣做會產生二次方複雜度,但以這種方式編寫的函式最終會完成,因為每次重新執行時,傳回 null 的查閱次數都會減少。使用 SkyKeyComputeState 即可將手動指定的檢查點資料與 SkyFunction 建立關聯,大幅節省重新運算的時間。

StateMachine 是存在於 SkyKeyComputeState 內的物件,可透過公開暫停和繼續執行掛鉤,在 SkyFunction 重新啟動時 (假設 SkyKeyComputeState 不會從快取中移除),幾乎完全避免重新運算。

SkyKeyComputeState 內的有狀態運算

從物件導向設計的角度來看,考慮將運算物件儲存在 SkyKeyComputeState 內,而非純資料值,是合理的做法。在 Java 中,攜帶物件行為的最基本說明是函式介面,而這已足夠。StateMachine 的定義如下,相當於遞迴定義2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks 介面與 SkyFunction.Environment 類似,但專為非同步設計,並新增對邏輯並行子工作的支援3

step 的傳回值是另一個 StateMachine,可歸納指定一連串步驟。step 會在 StateMachine 完成時傳回 DONE。例如:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

說明 StateMachine,並輸出下列內容。

hello
world

請注意,由於 step2 滿足 StateMachine 的函式介面定義,方法參照 this::step2 也是 StateMachine。方法參照是指定 StateMachine 中下一個狀態最常見的方式。

暫停及繼續

直覺上,將運算作業分解為 StateMachine 步驟,而非單一函式,可提供暫停繼續運算作業所需的掛鉤。StateMachine.step 傳回時,會有明確的暫停點。傳回的 StateMachine 值所指定的續傳是明確的「繼續」點。因此,運算作業可以從中斷處繼續執行,避免重新運算。

回呼、續傳和非同步運算

以技術角度來說,StateMachine 可做為「續傳」,決定要執行的後續運算。StateMachine 不會遭到封鎖,而是會從 step 函式傳回,藉此自願暫停,將控制權轉移回 Driver 例項。Driver 即可切換至就緒 StateMachine 狀態,或將控制權交還給 Skyframe。

傳統上,回呼續集會合併為一個概念。不過,StateMachines 會區分這兩者。

  • 回呼 - 說明非同步運算結果的儲存位置。
  • 後續 - 指定下一個執行狀態。

叫用非同步作業時必須使用回呼,也就是說,實際作業不會在呼叫方法後立即發生,例如 SkyValue 查閱作業。回呼應盡可能保持簡單。

續集StateMachineStateMachine 傳回值,並封裝所有非同步計算解析後接續執行的複雜作業。這種結構化做法有助於控制回呼的複雜度。

工作

Tasks 介面提供 API,可讓 StateMachine 依據 SkyKey 查閱 SkyValue,並排定並行子工作。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue 查詢

StateMachines 使用 Tasks.lookUp 多載來查閱 SkyValue。這兩者類似於 SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow,且具有類似的例外狀況處理語意。實作不會立即執行查閱作業,而是盡可能批次處理4 個查閱作業,再執行查閱。值可能不會立即提供 (例如需要重新啟動 Skyframe),因此呼叫端會使用回呼指定如何處理產生的值。

StateMachine 處理器 (Drivers 和橋接至 SkyFrame) 可確保值在下一個狀態開始前可用。範例如下:

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

在上例中,第一個步驟會查詢 new Key(),並將 this 做為消費者傳遞。這是因為 DoesLookup 實作了 Consumer<SkyValue>

根據合約,在下一個狀態 DoesLookup.processValue 開始前,所有 DoesLookup.step 的查閱作業都會完成。因此,在 processValue 中存取時,value 可供使用。

子工作

Tasks.enqueue 要求執行邏輯上並行的子工作。子工作也是 StateMachine,可以執行一般 StateMachine 的所有操作,包括遞迴建立更多子工作或查詢 SkyValues。與 lookUp 類似,狀態機器驅動程式會確保所有子工作都已完成,再繼續進行下一個步驟。範例如下:

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

雖然 Subtask1Subtask2 在邏輯上是並行,但所有項目都在單一執行緒中執行,因此 i 的「並行」更新不需要任何同步處理。

結構化並行

由於每個 lookUpenqueue 都必須先解析,才能進入下一個狀態,因此並行作業自然會限制在樹狀結構。您可以建立階層式5並行,如下列範例所示。

結構化並行

UML 來看,並不容易判斷並行結構是否形成樹狀結構。 您也可以使用替代檢視畫面,更清楚地查看樹狀結構。

非結構化並行

結構化並行更容易分析。

組合和控制流程模式

本節將提供多個 StateMachine 的組合範例,以及特定控制流程問題的解決方案。

依序狀態

這是最常見且最簡單的控制流程模式。如需這方面的範例,請參閱「SkyKeyComputeState 內部的有狀態計算」

分支

如要達成 StateMachine 中的分支狀態,可以使用一般 Java 控制流程傳回不同值,如下列範例所示。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  
}

某些分支機構很常提早完成並傳回 DONE

進階連續組合

由於 StateMachine 控制結構沒有記憶體,因此有時會難以將 StateMachine 定義分享為子工作。假設 M1M2 是共用 StateMachineStateMachine 執行個體,S 分別為 <A、S、B><X、S、Y> 序列。問題在於 S 不知道完成後要繼續執行 BY,而 StateMachine 並不會保留呼叫堆疊。本節將介紹幾種達成此目標的技巧。

StateMachine 做為終端序列元素

這無法解決最初提出的問題。只有在共用 StateMachine 是序列中的終端時,才會示範循序組合。

// S is the shared state machine.
class S implements StateMachine {  }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

即使 S 本身是複雜的狀態機器,也能正常運作。

依序組合的子工作

由於排入佇列的子工作保證會在下一個狀態之前完成,因此有時可能會稍微濫用6子工作機制。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter 注入

有時,由於在 S 執行前,必須完成其他平行子工作或 Tasks.lookUp 呼叫,因此無法濫用 Tasks.enqueue。在這種情況下,將 runAfter 參數注入 S,即可告知 S 後續該怎麼做。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
     // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
     // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

相較於濫用子工作,這種做法更簡潔。不過,如果過度使用這項功能 (例如以 runAfter 巢狀結構形式使用多個 StateMachine),就會導致回呼地獄。建議改用一般連續狀態,中斷連續 runAfter

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

可以替換為下列內容。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止替代方案:runAfterUnlessError

在先前的草稿中,我們考慮過 runAfterUnlessError,這會提早因錯誤而中止。這是因為錯誤通常會經過兩次檢查,一次由具有 runAfter 參照的 StateMachine 檢查,另一次由 runAfter 機台本身檢查。

經過審慎考慮後,我們認為程式碼的統一性比重複檢查錯誤更重要。如果 runAfter 機制與一律需要錯誤檢查的 tasks.enqueue 機制運作方式不一致,可能會造成混淆。

直接委派

每次正式狀態轉換時,主要 Driver 迴圈都會前進。 根據合約,推進狀態表示所有先前加入佇列的 SkyValue 查閱和子工作都會在下一個狀態執行前解析。有時,委派 StateMachine 的邏輯會導致階段前進不必要或適得其反。舉例來說,如果委派項目的第一個 step 執行 SkyKey 查閱作業,而這些作業可與委派狀態的查閱作業平行處理,則階段前進會使這些作業依序執行。如下方範例所示,直接委派可能更有意義。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return ;
  }

  // Rest of implementation.
  

  private StateMachine complete(Tasks tasks) {
    
    return runAfter;
  }
}

資料流程

先前的討論重點在於管理控制流程。本節說明資料值的傳播。

實作 Tasks.lookUp 回呼

如需實作 Tasks.lookUp 回呼的範例,請參閱「SkyValue 查閱」。本節提供處理多個 SkyValue 的基本原理和建議方法。

Tasks.lookUp 回呼

Tasks.lookUp 方法會將回呼 sink 做為參數。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

如要以符合語言習慣的方式實作這項功能,請使用 Java lambda:

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

其中 myValue 是執行查閱作業的 StateMachine 執行個體的成員變數。不過,相較於在 StateMachine 實作中實作 Consumer<SkyValue> 介面,Lambda 需要額外的記憶體配置。如果有多個會造成模稜兩可的查閱作業,lambda 仍可派上用場。

此外,Tasks.lookUp 也有錯誤處理超載,與 SkyFunction.Environment.getValueOrThrow 類似。

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

實作範例如下所示。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      
      return DONE;
    }
    // Processes `value`, which is non-null.
    
  }
}

與沒有錯誤處理的查閱作業一樣,讓 StateMachine 類別直接實作回呼,可節省 lambda 的記憶體配置。

錯誤處理提供更多詳細資料,但基本上,錯誤傳播與正常值傳播的差異不大。

使用多個 SkyValue

通常需要多次查詢 SkyValue。大部分時間都適用的方法是開啟 SkyValue 的類型。以下範例是從原型製作程式碼簡化而來。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

由於值型別不同,因此可以明確共用 Consumer<SkyValue> 回呼實作內容。如果不是這種情況,可以改用以 Lambda 為基礎的實作方式,或是實作適當回呼的完整內部類別執行個體。

StateMachine 之間傳播值

到目前為止,本文件只說明如何在子工作中安排工作,但子工作也需要將值回報給呼叫端。由於子工作在邏輯上屬於非同步,因此系統會使用回呼將結果傳回給呼叫端。如要達成這個目標,子工作會定義透過建構函式插入的接收器介面。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

   // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

呼叫端 StateMachine 如下所示。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上述範例說明瞭幾件事。Caller 必須將結果傳播回去,並定義自己的 Caller.ResultSinkCaller 會實作 BarProducer.ResultSink 回呼。恢復後,processResult 會檢查 value 是否為空值,判斷是否發生錯誤。接受子工作或 SkyValue 查詢的輸出內容後,通常會出現這種行為模式。

請注意,acceptBarError 的實作會根據錯誤冒泡的規定,將結果急切地轉送至 Caller.ResultSink

頂層 StateMachine 的替代方案請參閱Driver 和橋接至 SkyFunctions

處理錯誤

Tasks.lookUp回呼StateMachines 之間傳播值中,已有幾個錯誤處理範例。系統不會擲回 InterruptedException 以外的例外狀況,而是透過回呼以值的形式傳遞。這類回呼通常具有互斥或語意,且只會傳遞值或錯誤。

下一節將說明與 Skyframe 錯誤處理的細微但重要的互動。

錯誤冒泡 (--nokeep_going)

在錯誤冒泡期間,即使並非所有要求的 SkyValue 都可用,SkyFunction 仍可能會重新啟動。在這種情況下,由於 Tasks API 協定,系統永遠不會達到後續狀態。不過,StateMachine 仍應傳播例外狀況。

無論是否達到下一個狀態,都必須進行傳播,因此錯誤處理回呼必須執行這項工作。如果是內部 StateMachine,則會叫用父項回呼來達成此目的。

在頂層 StateMachine (與 SkyFunction 介面互動) 中,可以呼叫 ValueOrExceptionProducersetException 方法來完成這項操作。ValueOrExceptionProducer.tryProduceValue 隨後會擲回例外狀況,即使缺少 SkyValue 也是如此。

如果直接使用 Driver,即使機器尚未完成處理作業,也務必檢查 SkyFunction 傳播的錯誤。

事件處理

對於需要發出事件的 SkyFunction,系統會將 StoredEventHandler 注入 SkyKeyComputeState,然後進一步注入需要這些事件的 StateMachine。過去,由於 Skyframe 會捨棄特定事件 (除非重新播放),因此需要 StoredEventHandler,但這個問題隨後已修正。保留 StoredEventHandler 插入作業,是因為這樣可簡化從錯誤處理回呼發出的事件實作程序。

Driver,並橋接至 SkyFunctions

Driver 負責管理 StateMachine 的執行作業,從指定的根 StateMachine 開始。由於 StateMachine 可以遞迴將子工作 StateMachine 加入佇列,因此單一 Driver 可以管理大量子工作。這些子工作會建立樹狀結構,這是結構化並行的結果。Driver 批次處理子工作中的 SkyValue 查閱作業,以提升效率。

有許多類別是圍繞 Driver 建構,並使用下列 API。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver 會將單一根 StateMachine 做為參數。呼叫 Driver.drive 會執行 StateMachine,直到無法繼續執行為止,且不會重新啟動 Skyframe。如果 StateMachine 完成,則傳回 true,否則傳回 false,表示並非所有值都可用。

Driver 會維護 StateMachine 的並行狀態,非常適合嵌入 SkyKeyComputeState

直接例項化 Driver

StateMachine 通常會透過回呼傳達結果。您可以直接例項化 Driver,如以下範例所示。

Driver 會嵌入 SkyKeyComputeState 實作項目,以及對應的 ResultSink 實作項目 (稍後會進一步定義)。在頂層,State 物件是適合用來接收運算結果的接收器,因為保證會比 Driver 更長壽。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下程式碼會繪製 ResultProducer

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

   // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

接著,延遲計算結果的程式碼可能如下所示。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

嵌入 Driver

如果 StateMachine 產生值且未引發例外狀況,則嵌入 Driver 也是可能的實作方式,如下例所示。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
}

SkyFunction 可能會有類似下列的程式碼 (其中 StateSkyKeyComputeState 的函式專屬型別)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

StateMachine 實作中嵌入 Driver,更適合 Skyframe 的同步編碼樣式。

可能會產生例外狀況的 StateMachine

否則,您可以使用 SkyKeyComputeState 可嵌入的 ValueOrExceptionProducerValueOrException2Producer 類別,這些類別具有同步 API,可與同步 SkyFunction 程式碼相符。

ValueOrExceptionProducer 抽象類別包含下列方法。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
      // Implementation.
  }

  protected final void setValue(V value)  {   // Implementation. }
  protected final void setException(E exception) {   // Implementation. }
}

其中包含內嵌的 Driver 例項,與 Embedding driver 中的 ResultProducer 類別非常相似,且與 SkyFunction 的介面類似。請勿定義 ResultSink,而是實作在發生上述任一情況時呼叫 setValuesetException。如果兩者都發生,則例外狀況優先。tryProduceValue 方法會將非同步回呼程式碼橋接至同步程式碼,並在設定例外狀況時擲回例外狀況。

如先前所述,在錯誤冒泡期間,即使機器尚未完成作業,也可能發生錯誤,因為並非所有輸入內容都可用。為配合這項做法,tryProduceValue 會擲回任何設定的例外狀況,即使機器尚未完成作業也一樣。

尾聲:最終移除回呼

StateMachine 是執行非同步運算的高效率方式,但需要大量樣板。在 Bazel 程式碼的某些部分,續集 (特別是以傳遞至 ListenableFutureRunnable 形式) 十分常見,但在分析 SkyFunctions 中並不常見。分析作業大多會受到 CPU 限制,而且沒有適用於磁碟 I/O 的有效非同步 API。最終,最好能最佳化回呼,因為回呼有學習曲線,且會影響可讀性。

其中一個最有前景的替代方案是 Java 虛擬執行緒。不必再編寫回呼,一切都改用同步封鎖呼叫。這是因為繫結虛擬執行緒資源的成本應該很低,不像平台執行緒。不過,即使使用虛擬執行緒,以執行緒建立和同步處理基本項目取代簡單的同步作業,成本仍過高。我們從 StateMachine 遷移至 Java 虛擬執行緒,但速度慢了好幾個數量級,導致端對端分析延遲時間幾乎增加 3 倍。由於虛擬執行緒仍處於預覽階段,因此您可能會在效能提升後,於日後執行這項遷移作業。

如果 Loom 協同程式日後推出,也可以考慮使用。優點是可能可以透過合作多工處理,減少同步處理的負擔。

如果其他方法全都失效,低階位元組碼重寫也是可行的替代方案。經過充分最佳化後,或許就能達到接近手寫回呼程式碼的效能。

附錄

回呼地獄

回呼地獄是使用回呼的非同步程式碼中,臭名昭彰的問題。這是因為後續步驟的延續項目會巢狀內嵌於前一個步驟中。如果步驟很多,巢狀結構可能會非常深。如果與控制流程搭配使用,程式碼會變得難以管理。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

巢狀實作的優點之一,是可保留外部步驟的堆疊框架。在 Java 中,擷取的 lambda 變數必須是有效最終變數,因此使用這類變數可能很麻煩。如要避免深層巢狀結構,請傳回方法參照做為續集,而不是 Lambda,如下所示。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

如果 runAfter 插入模式使用過於密集,也可能發生回呼地獄,但只要在插入內容之間穿插連續步驟,即可避免這種情況。

範例:串連的 SkyValue 查閱

應用程式邏輯通常需要依附於 SkyValue 查閱鏈結,舉例來說,如果第二個 SkyKey 依附於第一個 SkyValue,如果以簡單的方式思考,這會導致複雜的深層巢狀回呼結構。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

不過,由於續集指定為方法參照,因此程式碼在狀態轉換期間看起來是程序式:step2 接著 step1。請注意,這裡使用 Lambda 指派 value2。這樣一來,程式碼的順序就會與由上而下的運算順序相符。

其他提示

可讀性:執行順序

為提升可讀性,請盡量讓 StateMachine.step 實作項目保持執行順序,並在程式碼中傳遞回呼實作項目後,立即執行這些項目。如果控制流程會分支,則不一定能做到這一點。在這種情況下,額外註解可能會有幫助。

在「範例:鏈結 SkyValue 查閱」中,會建立中介方法參照來達成此目的。這會稍微犧牲效能,換取可讀性,但應該值得。

世代假設

中等存留時間的 Java 物件會破壞 Java 垃圾收集器的世代假設,這個收集器專門處理存留時間極短或永久存留的物件。根據定義,SkyKeyComputeState 中的物件會違反這項假設。這類物件包含所有仍在執行的 StateMachine 的建構樹狀結構,以 Driver 為根,具有中繼生命週期,因為它們會暫停,等待非同步運算完成。

在 JDK19 中,這個問題似乎較不嚴重,但使用 StateMachine 時,即使實際產生的垃圾大幅減少,有時仍可能觀察到 GC 時間增加。由於 StateMachine 的生命週期介於兩者之間,因此可能會升級為舊世代,導致填滿速度更快,進而需要執行成本較高的大型或完整 GC 來清理。

初步的防範措施是盡量減少使用 StateMachine 變數,但這並非總是可行,例如,如果多個狀態都需要某個值,如果可以,本機堆疊 step 變數會是年輕代變數,並有效率地進行垃圾收集。

對於 StateMachine 變數,將工作分解為子工作,並遵循StateMachine 之間傳播值的建議模式,也有助於解決問題。請注意,按照模式操作時,只有子項 StateMachine 參照上層 StateMachine,反之則否。也就是說,當孩子完成作業並使用結果回呼更新家長時,孩子自然會超出範圍,並符合 GC 資格。

最後,在某些情況下,較早的狀態需要 StateMachine 變數,但較晚的狀態則不需要。如果知道不再需要大型物件的參照,將其設為空值可能會有好處。

命名狀態

命名方法時,通常可以根據該方法內發生的行為命名。在 StateMachine 中,由於沒有堆疊,因此較不清楚如何執行這項操作。舉例來說,假設方法 foo 會呼叫子方法 bar。在 StateMachine 中,這可能會轉譯為狀態序列 foo,後面接著 barfoo 不再包含 bar 行為。因此,各州的方法名稱範圍往往較窄,可能反映當地行為。

並行樹狀圖

下圖是「結構化並行」的替代檢視畫面,可更清楚地描繪樹狀結構。這些方塊會組成一棵小樹。

結構化並行 3D


  1. 與 Skyframe 的慣例不同,如果值無法使用,系統會從頭開始。 

  2. 請注意,step 可以擲回 InterruptedException,但範例會省略這項操作。Bazel 程式碼中有幾個低階方法會擲回這個例外狀況,並向上傳播至 Driver (稍後會說明),該方法會執行 StateMachine。不需要時,可以不用宣告要擲回的例外狀況。 

  3. 並行子工作是由 ConfiguredTargetFunction 驅動,可為每個依附元件執行獨立工作。與其操控會一次處理所有依附元件的複雜資料結構,進而造成效率低落,不如為每個依附元件提供各自獨立的 StateMachine。 

  4. 單一步驟中的多個 tasks.lookUp 呼叫會批次處理。 在並行子工作內進行的查閱作業,可能會建立額外的批次處理。

  5. 這在概念上與 Java 的結構化並行 jeps/428 類似。

  6. 這麼做類似於產生執行緒並加入執行緒,以達成循序組合。