[料理佳餚] C# 實作二階段提交(Two-phase Commit),即使 SQL Server 沒有啟用 MSDTC 也能做分散式交易。

無論是業務需求或是 IT 需求的關係,依據不同需要將資料庫分放在不同伺服器的情形很多見,要進行跨資料庫伺服器交易我們可以啟用 MSDTC 服務來達成,我就在想能不能不去動伺服器的設定,在應用程式端來實現分散式交易?

二階段提交(Two-phase Commit)是實作分散式交易的其中一種演算法,它在參與交易的資料庫節點之間加入一個協調者(Coordinator),而參與交易的資料庫節點稱為參與者(Cohorts),協調者跟參與者之間溝通的流程是這樣的。

  1. 協調者向所有參與者發送 QUERY TO COMMIT 請求,參與者執行 QUERY 指令,但不 COMMIT,如果執行成功就回應協調者 YES,失敗就回應 NO,這個階段稱為「投票階段」。
  2. 協調者收到所有參與者的回應後,如果所有參與者都回應 YES,則協調者向所有參與者發送 COMMIT 請求;若有其中一個參與者回應 NO,則協調者向所有參與者發送 ROLLBACK 請求,這個階段稱為「完成階段」。

邏輯大致了解之後,我們就可以來開始實作了。

參與者

首先實作參與者,所需要的參數是 SQL 語句SQL 參數SQL 連線字串(參數的多寡可以依照實際上的需求去設計),包含三個非同步的方法 ExecuteAsync()CommitAsync()RollbackAsync()

public class TwoPhaseCommitCohort : IDisposable
{
    private readonly IDbConnection cnn;
    private readonly string sql;
    private readonly object param;
    private IDbTransaction transaction;

    public TwoPhaseCommitCohort(string sql, string connString)
        : this(sql, null, connString)
    {
    }

    public TwoPhaseCommitCohort(string sql, object param, string connString)
    {
        this.cnn = new SqlConnection(connString);
        this.sql = sql;
        this.param = param;
    }

    public Task<bool> ExecuteAsync()
    {
        return Task.Run(
            () =>
                {
                    try
                    {
                        this.cnn.Open();
                        this.transaction = this.cnn.BeginTransaction();

                        // need 'Dapper' library
                        this.cnn.Execute(this.sql, this.param, this.transaction);

                        return true;
                    }
                    catch
                    {
                        // TODO: Handle error!
                    }

                    return false;
                });
    }

    public Task CommitAsync()
    {
        return Task.Run(
            () =>
                {
                    this.transaction?.Commit();

                    this.Dispose();
                });
    }

    public Task RollbackAsync()
    {
        return Task.Run(
            () =>
                {
                    this.transaction?.Rollback();

                    this.Dispose();
                });
    }

    public void Dispose()
    {
        this.cnn?.Dispose();
        this.transaction?.Dispose();
    }
}

協調者

協調者的部分也有三個方法:

  • AddCohort():加入參與者
  • Vote():進行投票階段,呼叫參與者的 ExecuteAsync() 方法,獲得投票結果。
  • Finish():進行完成階段,投票結果全部成功就呼叫參與者的 CommitAsync() 方法,投票結果有任何一個失敗就呼叫參與者的 RollbackAsync() 方法。
public class TwoPhaseCommitCoordinator : IDisposable
{
    private readonly List<TwoPhaseCommitCohort> cohorts = new List<TwoPhaseCommitCohort>();

    public void AddCohort(TwoPhaseCommitCohort cohort)
    {
        this.cohorts.Add(cohort);
    }

    public bool Vote()
    {
        var tasks = this.cohorts.Select(x => x.ExecuteAsync()).ToList();

        Task.WaitAll(tasks.ToArray<Task>());

        return tasks.All(x => x.Result);
    }

    public void Finish(bool voteResult)
    {
        var tasks = voteResult
                        ? this.cohorts.Select(x => x.CommitAsync()).ToArray()
                        : this.cohorts.Select(x => x.RollbackAsync()).ToArray();

        Task.WaitAll(tasks);
    }

    public void Dispose()
    {
        this.cohorts.ForEach(x => x.Dispose());
    }
}

我準備了一個範例,假定網站會員的 FirstName 跟 LastName 放在不同的資料庫伺服器,要 UPDATE 會員的名字時,要嘛一起 UPDATE 成功,要嘛一起 UPDATE 失敗,利用剛剛實作好的二階段提交我們就可以這樣寫。

private void UpdateMemberName(string firstName, string lastName, int id)
{
    using (var coordinator = new TwoPhaseCommitCoordinator())
    {
        coordinator.AddCohort(
            new TwoPhaseCommitCohort(
                "UPDATE Test1 SET FirstName = @FirstName WHERE Id = @Id;",
                new { Id = id, FirstName = firstName },
                connString1));

        coordinator.AddCohort(
            new TwoPhaseCommitCohort(
                "UPDATE Test3 SET LastName = @LastName WHERE Id = @Id;",
                new { Id = id, LastName = lastName },
                connString2));

        coordinator.Finish(coordinator.Vote());
    }
}

以上就是用 C# 實作二階段提交的一個簡單範例,不過利用二階段提交做分散式交易的時候需要謹慎一些,因為很容易受到外部硬體資源的影響,只要其中有一個參與者出了狀況就會影響其他的參與者,比如其中一個參與者 SQL 語句沒調校好,因而執行的時間特別長,這樣其他參與者就會都在等它,那麼相關的資料庫資源被鎖定的時間就會拉長,進而導致整個資料庫系統的交易吞吐量降了下來,雖然我們可以接著實作三階段提交(Three-phase Commit)引入一些超時機制來舒緩這些問題,但其根本的解決之道還是在於對資料庫的設計多下點功夫才是。

參考資料