Skip to main content
Version: 6.4

事务和并发

从 v3.4 开始,MongoDB 驱动程序 也支持事务。

¥Starting v3.4, transactions are also supported in MongoDB driver.

事务划分

¥Transaction Demarcation

事务划分是定义事务边界的任务。正确的事务划分非常重要,因为如果做得不正确,它会对应用的性能产生负面影响。许多数据库和数据库抽象层默认以自动提交模式运行,这意味着每个 SQL 语句都封装在一个小的事务中。如果你这边没有任何明确的事务划分,这很快就会导致性能不佳,因为事务并不便宜。

¥Transaction demarcation is the task of defining your transaction boundaries. Proper transaction demarcation is very important because if not done properly it can negatively affect the performance of your application. Many databases and database abstraction layers by default operate in auto-commit mode, which means that every single SQL statement is wrapped in a small transaction. Without any explicit transaction demarcation from your side, this quickly results in poor performance because transactions are not cheap.

在大多数情况下,MikroORM 已经为你处理了正确的事务划分:所有写入操作(INSERT/UPDATE/DELETE)都排队,直到调用 em.flush(),将所有这些更改封装在一个事务中。

¥For the most part, MikroORM already takes care of proper transaction demarcation for you: All the write operations (INSERT/UPDATE/DELETE) are queued until em.flush() is invoked which wraps all of these changes in a single transaction.

但是,MikroORM 还允许(并鼓励)你自己接管和控制事务划分。

¥However, MikroORM also allows (and encourages) you to take over and control transaction demarcation yourself.

这是使用 MikroORM 时处理事务的两种方法,现在将更详细地描述。

¥These are two ways to deal with transactions when using the MikroORM and are now described in more detail.

方法 1:隐式

¥Approach 1: Implicitly

第一种方法是使用 MikroORM EntityManager 提供的隐式事务处理。给出以下代码片段,没有任何明确的事务划分:

¥The first approach is to use the implicit transaction handling provided by the MikroORM EntityManager. Given the following code snippet, without any explicit transaction demarcation:

const user = new User(...);
user.name = 'George';
await orm.em.persist(user).flush();

由于我们没有在上面的代码中进行任何自定义事务划分,em.flush() 将开始并提交/回滚事务。此行为是通过 MikroORM 聚合 DML 操作实现的,如果作为工作单元一部分的所有数据操作都通过域模型和 ORM 进行,则此行为就足够了。

¥Since we do not do any custom transaction demarcation in the above code, em.flush() will begin and commit/rollback a transaction. This behavior is made possible by the aggregation of the DML operations by the MikroORM and is sufficient if all the data manipulation that is part of a unit of work happens through the domain model and thus the ORM.

方法 2:显式

¥Approach 2: Explicitly

显式替代方法是直接使用事务 API 来控制边界。然后代码如下所示:

¥The explicit alternative is to use the transactions API directly to control the boundaries. The code then looks like this:

await orm.em.transactional(em => {
//... do some work
const user = new User(...);
user.name = 'George';
em.persist(user);
});

或者你可以明确使用 begin/commit/rollback 方法。以下示例与上一个示例相同:

¥Or you can use begin/commit/rollback methods explicitly. Following example is equivalent to the previous one:

const em = orm.em.fork();
await em.begin();

try {
//... do some work
const user = new User(...);
user.name = 'George';
em.persist(user);
await em.commit(); // will flush before making the actual commit query
} catch (e) {
await em.rollback();
throw e;
}

处理显式事务的另一种方法是使用 @Transactional() 装饰器。

¥Another way to handle explicit transactions is by using the @Transactional() decorator.

import { EntityManager, MikroORM, Transactional } from '@mikro-orm/core';

export class MyService {

constructor(private readonly em: EntityManager) { }

@Transactional()
async doSomething() {
//... do some work
const user = new User(...);
user.name = 'George';
em.persist(user);
}

}

此装饰器用 em.transactional() 封装方法,因此你可以像 em.transactional() 一样提供 TransactionOptions。区别在于,你可以通过提供 context 选项来指定事务开始的上下文,如果省略,事务将隐式地在当前上下文中开始。它适用于异步函数,可以与 em.transactional() 嵌套。

¥This decorator wraps the method with em.transactional(), so you can provide TransactionOptions just like with em.transactional(). The difference is that you can specify the context in which the transaction begins by providing context option, and if omitted, the transaction will begin in the current context implicitly. It works on async functions and can be nested with em.transactional().

当你想要在工作单元中包含自定义 DBAL 操作或想要使用需要活动事务的 EntityManager API 的某些方法时,需要显式事务划分。此类方法将抛出 ValidationError 以通知你该要求。

¥Explicit transaction demarcation is required when you want to include custom DBAL operations in a unit of work or when you want to make use of some methods of the EntityManager API that require an active transaction. Such methods will throw a ValidationError to inform you of that requirement.

em.transactional(cb)@Transactional() 将在事务提交之前刷新内部 EntityManager

¥em.transactional(cb) and the @Transactional() will flush the inner EntityManager before transaction commit.

异常处理

¥Exception Handling

使用隐式事务划分并在 em.flush() 期间发生异常时,事务会自动回滚。

¥When using implicit transaction demarcation and an exception occurs during em.flush(), the transaction is automatically rolled back.

使用显式事务划分并发生异常时,应立即回滚事务,如上例所示。这可以通过前面显示的控制抽象优雅地处理。请注意,在捕获异常时,你通常应该重新抛出异常。如果你打算从某些异常中恢复,请在较早的 catch 块中明确捕获它们(但不要忘记回滚事务)。所有其他异常处理的最佳实践都适用(即记录或重新抛出,而不是两者兼而有之,等等)。

¥When using explicit transaction demarcation and an exception occurs, the transaction should be rolled back immediately as demonstrated in the example above. This can be handled elegantly by the control abstractions shown earlier. Note that when catching Exception you should generally re-throw the exception. If you intend to recover from some exceptions, catch them explicitly in earlier catch blocks (but do not forget to rollback the transaction). All other best practices of exception handling apply similarly (i.e. either log or re-throw, not both, etc.).

作为此过程的结果,所有先前管理或删除的 EntityManager 实例都将被分离。分离对象的状态将是事务回滚时的状态。对象的状态绝不会回滚,因此对象现在与数据库不同步。应用可以继续使用分离的对象,知道它们的状态可能不再准确。

¥As a result of this procedure, all previously managed or removed instances of the EntityManager become detached. The state of the detached objects will be the state at the point at which the transaction was rolled back. The state of the objects is in no way rolled back and thus the objects are now out of sync with the database. The application can continue to use the detached objects, knowing that their state is potentially no longer accurate.

如果你打算在发生异常后启动另一个工作单元,则应使用新的 EntityManager 执行此操作。只需传递这样的嵌套 where 条件,所有请求的关系将自动连接。

¥If you intend to start another unit of work after an exception has occurred you should do that with a new EntityManager. Simply use em.fork() to obtain fresh copy with cleared identity map.

锁定支持

¥Locking Support

为什么我们需要并发控制?

¥Why we need concurrency control?

如果事务是按顺序执行的(一次一个),则不存在事务并发。但是,如果允许具有交错操作的并发事务,你可能会很容易遇到以下问题之一:

¥If transactions are executed serially (one at a time), no transaction concurrency exists. However, if concurrent transactions with interleaving operations are allowed, you may easily run into one of those problems:

  1. 丢失更新问题

    ¥The lost update problem

  2. 脏读问题

    ¥The dirty read problem

  3. 不正确的摘要问题

    ¥The incorrect summary problem

为了缓解这些问题,MikroORM 原生支持悲观和乐观锁定策略。这允许你对应用中实体所需的锁定类型进行非常细粒度的控制。

¥To mitigate those problems, MikroORM offers support for Pessimistic and Optimistic locking strategies natively. This allows you to take very fine-grained control over what kind of locking is required for your entities in your application.

乐观锁定

¥Optimistic Locking

数据库事务对于单个请求期间的并发控制很好。但是,数据库事务不应跨越请求,即所谓的 "用户思考时间"。因此,跨越多个请求的长时间运行的 "业务事务" 需要涉及多个数据库事务。因此,在如此长时间运行的业务事务中,单独的数据库事务不再能控制并发性。并发控制成为应用本身的部分责任。

¥Database transactions are fine for concurrency control during a single request. However, a database transaction should not span across requests, the so-called "user think time". Therefore, a long-running "business transaction" that spans multiple requests needs to involve several database transactions. Thus, database transactions alone can no longer control concurrency during such a long-running business transaction. Concurrency control becomes the partial responsibility of the application itself.

MikroORM 通过版本字段集成了对自动乐观锁定的支持。在这种方法中,任何在长时间运行的业务事务期间应受到保护以防止并发修改的实体都会获得一个版本字段,该字段要么是简单数字(映射类型:整数),要么是时间戳(映射类型:日期时间)。当在长时间对话结束时持久化对此类实体的更改时,会将实体的版本与数据库中的版本进行比较,如果它们不匹配,则会抛出 OptimisticLockError,表示该实体已被其他人修改。

¥MikroORM has integrated support for automatic optimistic locking via a version field. In this approach any entity that should be protected against concurrent modifications during long-running business transactions gets a version field that is either a simple number (mapping type: integer) or a timestamp (mapping type: datetime). When changes to such an entity are persisted at the end of a long-running conversation the version of the entity is compared to the version in the database and if they don't match, a OptimisticLockError is thrown, indicating that the entity has been modified by someone else already.

你在实体中指定版本字段如下。在此示例中,我们将使用一个整数。

¥You designate a version field in an entity as follows. In this example we'll use an integer.

export class User {
// ...
@Property({ version: true })
version!: number;
// ...
}

或者,可以使用日期时间类型(映射到 SQL 时间戳或日期时间):

¥Alternatively a datetime type can be used (which maps to a SQL timestamp or datetime):

export class User {
// ...
@Property({ version: true })
version!: Date;
// ...
}

但是,应该优先使用版本号(而不是时间戳),因为它们在高度并发的环境中不会发生冲突,而时间戳则有可能发生冲突,具体取决于特定数据库平台上时间戳的分辨率。

¥Version numbers (not timestamps) should however be preferred as they can not potentially conflict in a highly concurrent environment, unlike timestamps where this is a possibility, depending on the resolution of the timestamp on the particular database platform.

当在 em.flush() 期间遇到版本冲突时,将抛出 OptimisticLockError 并回滚活动事务(或标记为回滚)。可以捕获和处理此异常。对 OptimisticLockError 的潜在响应是向用户呈现冲突或在新事务中刷新或重新加载对象,然后重试该事务。

¥When a version conflict is encountered during em.flush(), a OptimisticLockError is thrown and the active transaction rolled back (or marked for rollback). This exception can be caught and handled. Potential responses to a OptimisticLockError are to present the conflict to the user or to refresh or reload objects in a new transaction and then retrying the transaction.

在最坏的情况下,显示更新表单和实际修改实体之间的时间可能与应用的会话超时一样长。如果在该时间范围内实体发生变化,你希望在检索实体时直接知道你将遇到乐观锁定异常:

¥The time between showing an update form and actually modifying the entity can in the worst scenario be as long as your application's session timeout. If changes happen to the entity in that time frame you want to know directly when retrieving the entity that you will hit an optimistic locking exception:

你始终可以在请求期间调用 em.findOne() 时验证实体的版本:

¥You can always verify the version of an entity during a request either when calling em.findOne():

const theEntityId = 1;
const expectedVersion = 184;

try {
const entity = await orm.em.findOne(User, theEntityId, { lockMode: LockMode.OPTIMISTIC, lockVersion: expectedVersion });

// do the work

await orm.em.flush();
} catch (e) {
console.log('Sorry, but someone else has already changed this entity. Please apply the changes again!');
}

或者你可以使用 em.lock() 来找出:

¥Or you can use em.lock() to find out:

const theEntityId = 1;
const expectedVersion = 184;
const entity = await orm.em.findOne(User, theEntityId);

try {
// assert version
await orm.em.lock(entity, LockMode.OPTIMISTIC, expectedVersion);
} catch (e) {
console.log('Sorry, but someone else has already changed this entity. Please apply the changes again!');
}

并发检查

¥Concurrency Checks

与自动处理的版本字段相反,我们可以使用并发检查。它们允许我们标记要包含在并发检查中的特定属性,就像版本字段一样。但是这次,我们将负责明确更新字段。

¥As opposed to version fields that are handled automatically, we can use concurrency checks. They allow us to mark specific properties to be included in the concurrency check, just like the version field was. But this time, we will be responsible for updating the fields explicitly.

当我们尝试在不更改并发字段之一的情况下更新此类实体时,将抛出 OptimisticLockError。然后使用相同的机制来检查更新是否成功,如果不成功,则抛出相同类型的错误。

¥When we try to update such entity without changing one of the concurrency fields, OptimisticLockError will be thrown. Same mechanism is then used to check whether the update succeeded, and throw the same type of error when not.

@Entity()
export class ConcurrencyCheckUser {

// all primary keys are by default part of the concurrency check
@PrimaryKey({ length: 100 })
firstName: string;

// all primary keys are by default part of the concurrency check
@PrimaryKey({ length: 100 })
lastName: string;

@Property({ concurrencyCheck: true })
age: number;

@Property({ nullable: true })
other?: string;

}

重要实现说明

¥Important Implementation Notes

如果比较错误的版本,你很容易得到错误的乐观锁定工作流程。假设 Alice 和 Bob 正在编辑一篇假想的博客文章:

¥You can easily get the optimistic locking workflow wrong if you compare the wrong versions. Say you have Alice and Bob editing a hypothetical blog post:

  • Alice 读取博客文章的标题是 "Foo",处于乐观锁定版本 1(GET 请求)

    ¥Alice reads the headline of the blog post being "Foo", at optimistic lock version 1 (GET Request)

  • Bob 读取博客文章的标题为 "Foo",处于乐观锁版本 1(GET 请求)

    ¥Bob reads the headline of the blog post being "Foo", at optimistic lock version 1 (GET Request)

  • Bob 将标题更新为 "Bar",将乐观锁版本升级为 2(表单的 POST 请求)

    ¥Bob updates the headline to "Bar", upgrading the optimistic lock version to 2 (POST Request of a Form)

  • Alice 将标题更新为 "Baz",...(表单的 POST 请求)

    ¥Alice updates the headline to "Baz", ... (POST Request of a Form)

现在,在这个场景的最后阶段,必须再次从数据库中读取博客文章,然后才能应用 Alice 的标题。此时,你将需要检查博客文章是否仍为版本 1(在这种情况下不是)。

¥Now at the last stage of this scenario the blog post has to be read again from the database before Alice's headline can be applied. At this point you will want to check if the blog post is still at version 1 (which it is not in this scenario).

正确使用乐观锁定,你必须将版本添加为额外的隐藏字段(或添加到会话中以提高安全性)。否则,你无法验证版本是否仍然是 Alice 执行博客文章的 GET 请求时最初从数据库读取的版本。如果发生这种情况,你可能会看到丢失的更新,而你想通过乐观锁定来防止这种情况。

¥Using optimistic locking correctly, you have to add the version as an additional hidden field (or into the session for more safety). Otherwise, you cannot verify the version is still the one being originally read from the database when Alice performed her GET request for the blog post. If this happens you might see lost updates you wanted to prevent with Optimistic Locking.

有关更多详细信息,请参阅 部分。

¥See the example code (frontend):

const res = await fetch('api.example.com/book/123');
const book = res.json();
console.log(book.version); // prints the current version

// user does some changes and calls the PUT handler
const changes = { title: 'new title' };
await fetch('api.example.com/book/123', {
method: 'PUT',
body: {
...changes,
version: book.version,
},
});

相应的 API 端点:

¥And the corresponding API endpoints:

// GET /book/:id
async findOne(req, res) {
const book = await this.em.findOne(Book, +req.query.id);
res.json(book);
}

// PUT /book/:id
async update(req, res) {
const book = await this.em.findOne(Book, +req.query.id, { lockMode: LockMode.OPTIMISTIC, lockVersion: req.body.version });
wrap(book).assign(req.body);
await this.em.flush();

res.json(book);
}

你的前端应用从 API 加载实体,响应包含版本属性。用户进行一些更改并将 PUT 请求发回 API,版本字段包含在有效负载中。然后,API 的 PUT 处理程序会读取版本并将其传递给 em.findOne() 调用。

¥Your frontend app loads an entity from API, the response includes the version property. User makes some changes and fires PUT request back to the API, with version field included in the payload. The PUT handler of the API then reads the version and passes it to the em.findOne() call.

悲观锁定

¥Pessimistic Locking

MikroORM 在数据库级别支持悲观锁定。没有尝试在 MikroORM 内部实现悲观锁定,而是使用特定于供应商和 ANSI-SQL 命令来获取行级锁定。每个实体都可以成为悲观锁的一部分,使用此功能不需要特殊的元数据。

¥MikroORM supports Pessimistic Locking at the database level. No attempt is being made to implement pessimistic locking inside MikroORM, rather vendor-specific and ANSI-SQL commands are used to acquire row-level locks. Every Entity can be part of a pessimistic lock, there is no special metadata required to use this feature.

要使悲观锁定工作,需要一个开放事务。如果你尝试获取悲观锁并且没有事务正在运行,MikroORM 将抛出异常。

¥For Pessimistic Locking to work, an open transaction is required. MikroORM will throw an Exception if you attempt to acquire a pessimistic lock and no transaction is running.

MikroORM 目前支持 6 种悲观锁模式:

¥MikroORM currently supports 6 pessimistic lock modes:

模式PostgresMySQL
LockMode.PESSIMISTIC_READfor sharelock in share mode
LockMode.PESSIMISTIC_WRITEfor updatefor update
LockMode.PESSIMISTIC_PARTIAL_WRITEfor update skip lockedfor update skip locked
LockMode.PESSIMISTIC_WRITE_OR_FAILfor update nowaitfor update nowait
LockMode.PESSIMISTIC_PARTIAL_READfor share skip lockedlock in share mode skip locked
LockMode.PESSIMISTIC_READ_OR_FAILfor share nowaitlock in share mode nowait

你可以在三种不同的场景中使用悲观锁:

¥You can use pessimistic locks in three different scenarios:

  1. 使用 em.findOne(className, id, { lockMode: LockMode.PESSIMISTIC_WRITE })em.findOne(className, id, { lockMode: LockMode.PESSIMISTIC_READ })

    ¥Using em.findOne(className, id, { lockMode: LockMode.PESSIMISTIC_WRITE }) or em.findOne(className, id, { lockMode: LockMode.PESSIMISTIC_READ })

  2. 使用 em.lock(entity, LockMode.PESSIMISTIC_WRITE)em.lock(entity, LockMode.PESSIMISTIC_READ)

    ¥Using em.lock(entity, LockMode.PESSIMISTIC_WRITE) or em.lock(entity, LockMode.PESSIMISTIC_READ)

  3. 使用 QueryBuilder.setLockMode(LockMode.PESSIMISTIC_WRITE)QueryBuilder.setLockMode(LockMode.PESSIMISTIC_READ)

    ¥Using QueryBuilder.setLockMode(LockMode.PESSIMISTIC_WRITE) or QueryBuilder.setLockMode(LockMode.PESSIMISTIC_READ)

我们还可以选择通过 lockTableAliases 选项传递我们想要锁定的表别名列表:

¥Optionally we can also pass list of table aliases we want to lock via lockTableAliases option:

const res = await em.find(User, { name: 'Jon' }, {
populate: ['identities'],
strategy: LoadStrategy.JOINED,
lockMode: LockMode.PESSIMISTIC_READ,
lockTableAliases: ['u0'],
});

// select ...
// from "user" as "u0"
// left join "identity" as "i1" on "u0"."id" = "i1"."user_id"
// where "u0"."name" = 'Jon'
// for update of "u0" skip locked

隔离级别

¥Isolation levels

我们可以设置事务隔离级别:

¥We can set the transaction isolation levels:

await orm.em.transactional(async em => {
// ...
}, { isolationLevel: IsolationLevel.READ_UNCOMMITTED });

可用的隔离级别:

¥Available isolation levels:

  • IsolationLevel.READ_UNCOMMITTED

  • IsolationLevel.READ_COMMITTED

  • IsolationLevel.SNAPSHOT

  • IsolationLevel.REPEATABLE_READ

  • IsolationLevel.SERIALIZABLE

禁用事务

¥Disabling transactions

从 v5.7 开始,可以通过 disableTransactions 配置选项全局禁用事务,也可以在使用 em.transactional() 时本地禁用事务。

¥Since v5.7 is it possible to disable transactions, either globally via disableTransactions config option, or locally when using em.transactional().

// only the outer transaction will be opened
await orm.em.transactional(async em => {
// but the inner calls to both em.transactional and em.begin will be no-op
await em.transactional(...);
}, { disableTransactions: true });

或者,你可以在创建新分支时禁用事务:

¥Alternatively, you can disable transactions when creating new forks:

const em = await orm.em.fork({ disableTransactions: true });
await em.transactional(...); // no-op
await em.begin(...); // no-op
await em.commit(...); // commit still calls `flush`

本文档的这一部分受到 doctrine 内部文档 的极大启发,因为这里的行为几乎相同。

¥This part of documentation is highly inspired by doctrine internals docs as the behaviour here is pretty much the same.