無論是業務需求或是 IT 需求的關係,依據不同需要將資料庫分放在不同伺服器的情形很多見,要進行跨資料庫伺服器交易我們可以啟用 MSDTC 服務來達成,我就在想能不能不去動伺服器的設定,在應用程式端來實現分散式交易?
二階段提交(Two-phase Commit)是實作分散式交易的其中一種演算法,它在參與交易的資料庫節點之間加入一個協調者(Coordinator)
,而參與交易的資料庫節點稱為參與者(Cohorts)
,協調者跟參與者之間溝通的流程是這樣的。
- 協調者向所有參與者發送
QUERY TO COMMIT
請求,參與者執行 QUERY 指令,但不 COMMIT,如果執行成功就回應協調者YES
,失敗就回應NO
,這個階段稱為「投票階段」。 - 協調者收到所有參與者的回應後,如果所有參與者都回應 YES,則協調者向所有參與者發送
COMMIT
請求;若有其中一個參與者回應 NO,則協調者向所有參與者發送ROLLBACK
請求,這個階段稱為「完成階段」。
邏輯大致了解之後,我們就可以來開始實作了。
參與者
首先實作參與者,所需要的參數是 SQL 語句
、SQL 參數
、SQL 連線字串
(參數的多寡可以依照實際上的需求去設計),包含三個非同步的方法 ExecuteAsync()
、CommitAsync()
、RollbackAsync()
。
public class TwoPhaseCommitCohort : IDisposable
{
private readonly IDbConnection cnn;
private readonly string sql;
private readonly object param;
private IDbTransaction transaction;
public TwoPhaseCommitCohort(string sql, string connString)
: this(sql, null, connString)
{
}
public TwoPhaseCommitCohort(string sql, object param, string connString)
{
this.cnn = new SqlConnection(connString);
this.sql = sql;
this.param = param;
}
public Task<bool> ExecuteAsync()
{
return Task.Run(
() =>
{
try
{
this.cnn.Open();
this.transaction = this.cnn.BeginTransaction();
// need 'Dapper' library
this.cnn.Execute(this.sql, this.param, this.transaction);
return true;
}
catch
{
// TODO: Handle error!
}
return false;
});
}
public Task CommitAsync()
{
return Task.Run(
() =>
{
this.transaction?.Commit();
this.Dispose();
});
}
public Task RollbackAsync()
{
return Task.Run(
() =>
{
this.transaction?.Rollback();
this.Dispose();
});
}
public void Dispose()
{
this.cnn?.Dispose();
this.transaction?.Dispose();
}
}
協調者
協調者的部分也有三個方法:
AddCohort()
:加入參與者Vote()
:進行投票階段,呼叫參與者的 ExecuteAsync() 方法,獲得投票結果。Finish()
:進行完成階段,投票結果全部成功就呼叫參與者的 CommitAsync() 方法,投票結果有任何一個失敗就呼叫參與者的 RollbackAsync() 方法。
public class TwoPhaseCommitCoordinator : IDisposable
{
private readonly List<TwoPhaseCommitCohort> cohorts = new List<TwoPhaseCommitCohort>();
public void AddCohort(TwoPhaseCommitCohort cohort)
{
this.cohorts.Add(cohort);
}
public bool Vote()
{
var tasks = this.cohorts.Select(x => x.ExecuteAsync()).ToList();
Task.WaitAll(tasks.ToArray<Task>());
return tasks.All(x => x.Result);
}
public void Finish(bool voteResult)
{
var tasks = voteResult
? this.cohorts.Select(x => x.CommitAsync()).ToArray()
: this.cohorts.Select(x => x.RollbackAsync()).ToArray();
Task.WaitAll(tasks);
}
public void Dispose()
{
this.cohorts.ForEach(x => x.Dispose());
}
}
我準備了一個範例,假定網站會員的 FirstName 跟 LastName 放在不同的資料庫伺服器,要 UPDATE 會員的名字時,要嘛一起 UPDATE 成功,要嘛一起 UPDATE 失敗,利用剛剛實作好的二階段提交我們就可以這樣寫。
private void UpdateMemberName(string firstName, string lastName, int id)
{
using (var coordinator = new TwoPhaseCommitCoordinator())
{
coordinator.AddCohort(
new TwoPhaseCommitCohort(
"UPDATE Test1 SET FirstName = @FirstName WHERE Id = @Id;",
new { Id = id, FirstName = firstName },
connString1));
coordinator.AddCohort(
new TwoPhaseCommitCohort(
"UPDATE Test3 SET LastName = @LastName WHERE Id = @Id;",
new { Id = id, LastName = lastName },
connString2));
coordinator.Finish(coordinator.Vote());
}
}
以上就是用 C# 實作二階段提交的一個簡單範例,不過利用二階段提交做分散式交易的時候需要謹慎一些,因為很容易受到外部硬體資源的影響,只要其中有一個參與者出了狀況就會影響其他的參與者,比如其中一個參與者 SQL 語句沒調校好,因而執行的時間特別長,這樣其他參與者就會都在等它,那麼相關的資料庫資源被鎖定的時間就會拉長,進而導致整個資料庫系統的交易吞吐量降了下來,雖然我們可以接著實作三階段提交(Three-phase Commit)引入一些超時機制來舒緩這些問題,但其根本的解決之道還是在於對資料庫的設計多下點功夫才是。