Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
# docker run --rm -it --volume $PWD:/app --net="host" -w /app ghcr.io/patchlevel/php:8.5
services:
php:
image: ghcr.io/patchlevel/php:8.5
volumes:
- .:/app
working_dir: /app
network_mode: host
tty: true
stdin_open: true
command: sleep infinity

postgres:
image: postgres:alpine
environment:
Expand All @@ -13,4 +24,4 @@ services:
- MYSQL_ALLOW_EMPTY_PASSWORD="yes"
- MYSQL_DATABASE=eventstore
ports:
- 3306:3306
- 3306:3306
54 changes: 54 additions & 0 deletions docs/UPGRADE-4.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,60 @@ $subscriptionEngine = new DefaultSubscriptionEngine(
RetryStrategyRepository::withDefault($retryStrategy),
);
```
### Subscription Engine Commands

The `SubscriptionEngine` interface has been changed.
The methods `setup`, `boot`, `run`, `teardown`, `remove`, `reactivate`, `pause` and `refresh` have been replaced
by a single `execute` method that takes a command object.
The `ids` and `groups` filters, previously passed via `SubscriptionEngineCriteria`,
are now constructor parameters of the command objects.
The `SubscriptionEngineCriteria` is now only used for the `subscriptions` method.

before:

```php
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->setup(new SubscriptionEngineCriteria(ids: ['profile_1']), skipBooting: true);
$subscriptionEngine->boot(new SubscriptionEngineCriteria(ids: ['profile_1']), limit: 100);
$subscriptionEngine->run(new SubscriptionEngineCriteria(ids: ['profile_1']), limit: 100);
$subscriptionEngine->teardown(new SubscriptionEngineCriteria(ids: ['profile_1']));
$subscriptionEngine->remove(new SubscriptionEngineCriteria(ids: ['profile_1']));
$subscriptionEngine->reactivate(new SubscriptionEngineCriteria(ids: ['profile_1']));
$subscriptionEngine->pause(new SubscriptionEngineCriteria(ids: ['profile_1']));
$subscriptionEngine->refresh(new SubscriptionEngineCriteria(ids: ['profile_1']));
```
after:

```php
use Patchlevel\EventSourcing\Subscription\Engine\Command\Boot;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Pause;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Reactivate;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Refresh;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Remove;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Run;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Setup;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Teardown;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->execute(new Setup(ids: ['profile_1'], skipBooting: true));
$subscriptionEngine->execute(new Boot(ids: ['profile_1'], limit: 100));
$subscriptionEngine->execute(new Run(ids: ['profile_1'], limit: 100));
$subscriptionEngine->execute(new Teardown(ids: ['profile_1']));
$subscriptionEngine->execute(new Remove(ids: ['profile_1']));
$subscriptionEngine->execute(new Reactivate(ids: ['profile_1']));
$subscriptionEngine->execute(new Pause(ids: ['profile_1']));
$subscriptionEngine->execute(new Refresh(ids: ['profile_1']));
```
Further changes:

* The `CanRefreshSubscriptions` interface has been removed. Refresh is now part of the `SubscriptionEngine` interface via the `Refresh` command.
* `ProcessedResult` now extends `Result`, so the `execute` method always returns a `Result`. The `Boot` and `Run` commands return a `ProcessedResult`.
* The `DefaultSubscriptionEngine` accepts an optional `EventDispatcherInterface` as last constructor argument to hook into the engine with own listeners.

## Store

### StreamStore
Expand Down
2 changes: 2 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ To manage your subscriptions there are the following cli commands.
* SubscriptionBootCommand: `event-sourcing:subscription:boot`
* SubscriptionPauseCommand: `event-sourcing:subscription:pause`
* SubscriptionReactiveCommand: `event-sourcing:subscription:reactive`
* SubscriptionRefreshCommand: `event-sourcing:subscription:refresh`
* SubscriptionRemoveCommand: `event-sourcing:subscription:remove`
* SubscriptionRunCommand: `event-sourcing:subscription:run`
* SubscriptionSetupCommand: `event-sourcing:subscription:setup`
Expand Down Expand Up @@ -86,6 +87,7 @@ $cli->addCommands([
new Command\SubscriptionTeardownCommand($subscriptionEngine),
new Command\SubscriptionRemoveCommand($subscriptionEngine),
new Command\SubscriptionReactivateCommand($subscriptionEngine),
new Command\SubscriptionRefreshCommand($subscriptionEngine),
new Command\SubscriptionSetupCommand($subscriptionEngine),
new Command\SubscriptionStatusCommand($subscriptionEngine),
new Command\SchemaCreateCommand($schemaDirector),
Expand Down
3 changes: 2 additions & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Setup;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore;

Expand All @@ -358,7 +359,7 @@ $schemaDirector = new DoctrineSchemaDirector(
$schemaDirector->create();

/** @var SubscriptionEngine $engine */
$engine->setup(skipBooting: true);
$engine->execute(new Setup(skipBooting: true));
```

:::note
Expand Down
116 changes: 81 additions & 35 deletions docs/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,8 @@ $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([
Now we can create the subscription engine and plug together the necessary services.
The message loader is needed to load the messages, the Subscription Store to store the subscription state
and we need the subscriber accessor repository. Optionally, we can also pass a retry strategy.
Finally, if we want to use the cleanup feature, we need to pass the cleanup handlers.
If we want to use the cleanup feature, we need to pass the cleanup handlers.
Finally, we can pass an event dispatcher to hook into the engine with own listeners.

```php
use Doctrine\DBAL\Connection;
Expand All @@ -1153,6 +1154,7 @@ use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategyRepository;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
use Symfony\Component\EventDispatcher\EventDispatcher;

/**
* @var MessageLoader $messageLoader
Expand All @@ -1169,6 +1171,34 @@ $subscriptionEngine = new DefaultSubscriptionEngine(
$retryStrategyRepository, // optional, if not set the default retry strategy is used
$logger, // optional
new DefaultCleaner([new DbalCleanupTaskHandler($projectionConnection)]), // optional but required if you want to use the cleanup feature
new EventDispatcher(), // optional, to hook into the engine with own listeners
);
```
### Engine Events

The `DefaultSubscriptionEngine` dispatches events during processing on the passed event dispatcher.
You can register your own listeners to hook into the engine, for example for logging, metrics or batching.

| Event | Description |
|--------------------------|----------------------------------------------------------------------|
| `OnCommand` | A command was passed to the engine for execution |
| `OnSubscriptions` | The engine determined the subscriptions for the current command |
| `OnHandleMessage` | A message is about to be passed to a subscriber |
| `OnHandleMessageSuccess` | A message was successfully handled by a subscriber |
| `OnHandleMessageError` | An error occurred while a subscriber was handling a message |
| `OnProcessingFinished` | The engine finished processing the stream (ended or limit reached) |
| `OnResult` | The engine finished the command and returns the result |

```php
use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageError;
use Symfony\Component\EventDispatcher\EventDispatcher;

$eventDispatcher = new EventDispatcher();
$eventDispatcher->addListener(
OnHandleMessageError::class,
static function (OnHandleMessageError $event): void {
// own error handling like logging or metrics
},
);
```
### Catch up Subscription Engine
Expand Down Expand Up @@ -1249,15 +1279,20 @@ Especially in combination with the `CatchUpSubscriptionEngine` and `ThrowOnError

## Usage

The Subscription Engine has a few methods needed to use it effectively.
A `SubscriptionEngineCriteria` can be passed to all of these methods to filter the respective subscriptions.
The Subscription Engine is controlled with command objects.
Each command is passed to the `execute` method, which returns a `Result` with the errors that occurred.
Every command accepts `ids` and `groups` parameters to filter the subscriptions the command should be applied to.

```php
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Run;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;

$criteria = new SubscriptionEngineCriteria(
ids: ['profile_1', 'welcome_email'],
groups: ['default'],
/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->execute(
new Run(
ids: ['profile_1', 'welcome_email'],
groups: ['default'],
),
);
```

Expand All @@ -1272,52 +1307,62 @@ In this step, the subscription engine also tries to call the `setup` method if a
After the setup process, the subscription is set to booting or active.

```php
use Patchlevel\EventSourcing\Subscription\Engine\Command\Setup;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->setup(new SubscriptionEngineCriteria());
$subscriptionEngine->execute(new Setup());
```

:::tip
You can skip the booting step with the second boolean parameter named `skipBooting`.
You can skip the booting step with the `skipBooting` parameter: `new Setup(skipBooting: true)`.
:::

### Boot

You can boot the subscriptions with the `boot` method.
You can boot the subscriptions with the `Boot` command.
All booting subscriptions will catch up to the current event stream.
After the boot process, the subscription is set to active or finished.

```php
use Patchlevel\EventSourcing\Subscription\Engine\Command\Boot;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->boot(new SubscriptionEngineCriteria());
$subscriptionEngine->execute(new Boot());
```

:::tip
You can limit the number of processed messages with the `limit` parameter: `new Boot(limit: 100)`.
:::

### Run

All active subscriptions are continued and updated here.

```php
use Patchlevel\EventSourcing\Subscription\Engine\Command\Run;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->run(new SubscriptionEngineCriteria());
$subscriptionEngine->execute(new Run());
```

:::tip
You can limit the number of processed messages with the `limit` parameter: `new Run(limit: 100)`.
:::

### Teardown

If subscriptions are detached, they can be cleaned up here.
The subscription engine also tries to call the `teardown` method if available.

```php
use Patchlevel\EventSourcing\Subscription\Engine\Command\Teardown;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->teardown(new SubscriptionEngineCriteria());
$subscriptionEngine->execute(new Teardown());
```
### Remove

Expand All @@ -1326,23 +1371,23 @@ An attempt is made to call the `teardown` method if available.
But the entry will still be removed if it doesn't work.

```php
use Patchlevel\EventSourcing\Subscription\Engine\Command\Remove;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->remove(new SubscriptionEngineCriteria());
$subscriptionEngine->execute(new Remove());
```
### Reactivate

If a subscription had an error or is outdated, you can reactivate it.
As a result, the subscription gets in the last status again.

```php
use Patchlevel\EventSourcing\Subscription\Engine\Command\Reactivate;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->reactivate(new SubscriptionEngineCriteria());
$subscriptionEngine->execute(new Reactivate());
```
### Pause

Expand All @@ -1351,38 +1396,39 @@ The subscription will then no longer be managed by the subscription engine.
You can reactivate the subscription if you want so that it continues.

```php
use Patchlevel\EventSourcing\Subscription\Engine\Command\Pause;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->pause(new SubscriptionEngineCriteria());
$subscriptionEngine->execute(new Pause());
```
### Status
### Refresh

To get the current status of all subscriptions, you can get them using the `subscriptions` method.
If you change the metadata of a subscriber in the code (e.g. `runMode`, `group` or `cleanupTasks`),
you can use the `Refresh` command to update the existing subscriptions in the store.

```php
use Patchlevel\EventSourcing\Subscription\Engine\Command\Refresh;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptions = $subscriptionEngine->subscriptions(new SubscriptionEngineCriteria());

foreach ($subscriptions as $subscription) {
echo $subscription->status()->value;
}
$subscriptionEngine->execute(new Refresh());
```
### Refresh
### Status

If you change the metadata of a subscriber in the code (e.g. `runMode`, `group` or `cleanupTasks`),
you can use the `refresh` method to update the existing subscriptions in the store.
To get the current status of all subscriptions, you can get them using the `subscriptions` method.
A `SubscriptionEngineCriteria` can be passed to filter the subscriptions.

```php
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->refresh(new SubscriptionEngineCriteria());
$subscriptions = $subscriptionEngine->subscriptions(new SubscriptionEngineCriteria());

foreach ($subscriptions as $subscription) {
echo $subscription->status()->value;
}
```
## Learn more

Expand Down
18 changes: 6 additions & 12 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ parameters:
count: 1
path: src/Store/TaggableDoctrineDbalStore.php

-
message: '#^Parameter \#1 \$command of callable Patchlevel\\EventSourcing\\Subscription\\Engine\\Handler\\BootHandler\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Handler\\PauseHandler\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Handler\\ReactivateHandler\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Handler\\RefreshHandler\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Handler\\RemoveHandler\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Handler\\RunHandler\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Handler\\SetupHandler\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Handler\\TeardownHandler expects Patchlevel\\EventSourcing\\Subscription\\Engine\\Command\\Boot\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Command\\Pause\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Command\\Reactivate\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Command\\Refresh\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Command\\Remove\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Command\\Run\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Command\\Setup\|Patchlevel\\EventSourcing\\Subscription\\Engine\\Command\\Teardown, Patchlevel\\EventSourcing\\Subscription\\Engine\\Command\\Command given\.$#'
identifier: argument.type
count: 1
path: src/Subscription/Engine/DefaultSubscriptionEngine.php

-
message: '#^Parameter \#1 \$eventClass of method Patchlevel\\EventSourcing\\Metadata\\Event\\EventRegistry\:\:eventName\(\) expects class\-string, string given\.$#'
identifier: argument.type
Expand Down Expand Up @@ -396,18 +402,6 @@ parameters:
count: 2
path: tests/Unit/QueryBus/ServiceHandlerProviderTest.php

-
message: '#^Match expression does not handle remaining value\: string$#'
identifier: match.unhandled
count: 1
path: tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php

-
message: '#^Parameter \#1 \$error of static method Patchlevel\\EventSourcing\\Subscription\\ThrowableToErrorContextTransformer\:\:transform\(\) expects Throwable, Throwable\|null given\.$#'
identifier: argument.type
count: 8
path: tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php

-
message: '#^Offset ''args'' on array\{file\: literal\-string&non\-falsy\-string, line\: int, function\: ''createException'', class\: ''Patchlevel\\\\EventSourcing\\\\Tests\\\\Unit\\\\Subscription\\\\ErrorContextTest'', type\: ''\-\>'', args\: array\<mixed\>\} on left side of \?\? always exists and is not nullable\.$#'
identifier: nullCoalesce.offset
Expand Down
Loading
Loading