Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions docs/guides/parallel-scraping/parallel-scraper.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { fork } from 'node:child_process';

import { FileSystemStorageClient } from '@crawlee/fs-storage';
import { Configuration, Dataset, PlaywrightCrawler, log } from 'crawlee';

import { router } from './routes.mjs';
Expand Down Expand Up @@ -76,13 +77,19 @@ if (!process.env.IN_WORKER_THREAD) {
// Get the request queue
const requestQueue = await getOrInitQueue(false);

// Disable the automatic purge on start and configure crawlee to store the worker-specific data in a separate directory
// (needs to be done AFTER the queue is initialized when running locally)
// Disable the automatic purge on start, so we don't lose the queue we prepared
const config = new Configuration({
purgeOnStart: false,
storageClientOptions: {
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
},
});

// Store the worker's own internal state (its default dataset, key-value store, etc.) in a separate
// directory so the workers don't collide with each other (needs to be done AFTER the queue is
// initialized when running locally). This directory is private to a single worker, so we set
// `assumeSoleOwner: true` — the concurrency-safe locking only matters for the shared `shop-urls`
// queue, which gets its own storage client in `requestQueue.mjs`.
const storageClient = new FileSystemStorageClient({
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
assumeSoleOwner: true,
});

workerLogger.debug('Setting up crawler.');
Expand All @@ -98,6 +105,10 @@ if (!process.env.IN_WORKER_THREAD) {
// highlight-end
// Let's also limit the crawler's concurrency, we don't want to overload a single process 🐌
maxConcurrency: 5,
// Use the worker-specific, concurrency-safe storage client we created above
// highlight-start
storageClient,
// highlight-end
},
config,
);
Expand Down
46 changes: 33 additions & 13 deletions docs/guides/parallel-scraping/parallel-scraping.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ The first step in our conversion process will be creating a common file (let's c

The exported function, `getOrInitQueue`, might seem like it does a lot. In essence, it just ensures the request queue is initialized, and if requested, ensures it starts off with an empty state.

:::caution Make the shared queue concurrency-safe with `assumeSoleOwner: false`

Because every worker process opens this same `shop-urls` queue at the same time, it **must** use the concurrency-safe locking behavior of `FileSystemStorageClient`. That's why `getOrInitQueue` opens the queue with a storage client constructed with `assumeSoleOwner: false`.

By default, `FileSystemStorageClient` assumes it is the *sole* consumer of a queue (`assumeSoleOwner: true`). On open it immediately reclaims any requests left *in progress* — great for a single-process crawl recovering after a crash, but disastrous when workers run side by side: each worker would happily grab requests another worker is still processing, so the same URL gets scraped multiple times.

Setting `assumeSoleOwner: false` tells the client to treat an in-progress request as a potential live peer's lock and only reclaim it once the lock expires on the wall clock, so two workers never process the same request at once.

:::

### Adapting our previous scraper to enqueue the product URLs to the new queue

In the `src/routes.mjs` file of the scraper we previously built, we have a handler for the `CATEGORY` label. Let's adapt that handler to enqueue the product URLs to the new queue we created.
Expand Down Expand Up @@ -122,34 +132,44 @@ This will check how the script is executed as. If this value has _any_ value, it

We use this to ensure the parent process stays alive until all the worker processes exit. Otherwise, the worker processes would just get spawned, and lose the ability to communicate with the parent. You might not need this depending on your use case (maybe you just need to spawn workers and let them process).

#### What's with all those `Configuration` calls?
#### What's with all the `Configuration` and storage client setup?

There are three steps we want to do for the worker processes:
There are two things we want to do for the worker processes:

- get the queue that supports locking from the same location as the parent process
- ensure the default storages do **not** get purged on start, as otherwise we'd lose the queue we prepared, and initialize a special storage for worker processes so they do not collide with each other
- get the shared queue from the same location as the parent process (it already comes with the concurrency-safe storage client we set up in `requestQueue.mjs`)
- ensure the default storages do **not** get purged on start, as otherwise we'd lose the queue we prepared, and give each worker its own private storage directory for its internal state so the workers don't collide with each other

In order, that's what these lines do:

```javascript title="src/parallel-scraper.mjs"
// Get the request queue from the parent process (step 1)
import { FileSystemStorageClient } from '@crawlee/fs-storage';

// Get the shared request queue from the parent process (step 1)
const requestQueue = await getOrInitQueue(false);

// Disable the automatic purge on start and configure crawlee to store the worker-specific data
// in a separate directory (needs to be done AFTER the queue is initialized when running locally) (step 2)
const config = new Configuration({
purgeOnStart: false,
storageClientOptions: {
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
},
// Disable the automatic purge on start, so we don't lose the queue we prepared (step 2)
const config = new Configuration({ purgeOnStart: false });

// Store the worker's own internal state in a separate directory so workers don't collide (step 2,
// cont.). Needs to be done AFTER the queue is initialized when running locally. This directory is
// private to a single worker, so we explicitly set `assumeSoleOwner: true`.
const storageClient = new FileSystemStorageClient({
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
assumeSoleOwner: true,
});
```

:::note Why no `assumeSoleOwner: false` here?

Each worker's `./storage/worker-N` directory is private to that single worker — nothing else opens it — so the default `assumeSoleOwner: true` is exactly right. The concurrency-safe locking only matters for storage that is genuinely shared across processes, which is the `shop-urls` queue in `requestQueue.mjs`, not this per-worker internal state.

:::

#### Telling the crawler to use the worker configuration

You might have noticed several lines highlighted in the code above. Those show how you provide the shared request queue to the crawler.

You might have also noticed we passed in a second parameter to the constructor of the crawler, the `config` variable we created earlier. This is needed to ensure the crawler uses the worker-specific storages for internal states, and that they do not collide with each other.
You might have also noticed we passed in the `config` and `storageClient` we created earlier to the crawler. These ensure the crawler uses the worker-specific storages for its own internal state (so the workers do not collide with each other), while still consuming the shared, concurrency-safe `shop-urls` queue we provided explicitly.

#### Why do we use `process.send` instead of `context.pushData`?

Expand Down
12 changes: 10 additions & 2 deletions docs/guides/parallel-scraping/shared.mjs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import { FileSystemStorageClient } from '@crawlee/fs-storage';
import { RequestQueue } from 'crawlee';

// The request queue shared by all the parallel workers
let queue;

// The `shop-urls` queue is opened concurrently by every worker process, so it must use the
// concurrency-safe locking behavior. With `assumeSoleOwner: false`, a request another worker is
// still processing is treated as a live peer's lock and is not handed out again until that lock
// expires — so two workers never scrape the same URL at once. (We point at the default `./storage`
// location, which is where this shared queue lives.)
const sharedStorageClient = new FileSystemStorageClient({ assumeSoleOwner: false });

/**
* @param {boolean} makeFresh Whether the queue should be cleared before returning it
* @returns The queue
Expand All @@ -12,11 +20,11 @@ export async function getOrInitQueue(makeFresh = false) {
return queue;
}

queue = await RequestQueue.open('shop-urls');
queue = await RequestQueue.open('shop-urls', { storageClient: sharedStorageClient });

if (makeFresh) {
await queue.drop();
queue = await RequestQueue.open('shop-urls');
queue = await RequestQueue.open('shop-urls', { storageClient: sharedStorageClient });
}

return queue;
Expand Down
17 changes: 16 additions & 1 deletion docs/upgrading/upgrading_v4.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,12 +683,27 @@ const storageClient = new FileSystemStorageClient();
const inMemory = new MemoryStorageClient();
```

The `localDataDirectory`, `persistStorage`, and `writeMetadata` options are still accepted by `MemoryStorageClient` for source compatibility, but they are ignored — in-memory storage has nowhere to write. `FileSystemStorageClient` honors `localDataDirectory` and `writeMetadata`; it always persists, so it has no `persistStorage` option.
`MemoryStorageClient` no longer takes the `localDataDirectory`, `persistStorage`, or `writeMetadata` options — in-memory storage has nowhere to write, so they had no meaning. `FileSystemStorageClient` honors `localDataDirectory`; it always persists, so it has no `persistStorage` option, and the `writeMetadata` option has been removed there too (see [`writeMetadata` option removed](#writemetadata-option-removed)).

### No request lock expiry in `MemoryStorageClient`

Because the in-memory queue lives entirely within a single process and is never shared with another consumer, `MemoryStorageClient`'s request queue no longer uses an expiring, cross-process lock. A fetched request simply stays *in progress* until it is handled or reclaimed; it never becomes fetchable again on its own after a timeout. `setExpectedRequestProcessingTimeSecs()` is therefore a no-op for in-memory storage. (Disk-backed `FileSystemStorageClient` keeps the lock-with-expiry behavior.)

### `writeMetadata` option removed

`FileSystemStorageClient` no longer accepts the `writeMetadata` option. The underlying file-system storage now always writes metadata files (`__metadata__.json` for each storage and a `<key>.__metadata__.json` sidecar for each key-value record), so the toggle no longer had any effect. Remove it from your storage client options:

```diff
import { FileSystemStorageClient } from '@crawlee/fs-storage';

const storageClient = new FileSystemStorageClient({
localDataDirectory: './storage',
- writeMetadata: true,
});
```

`MemoryStorageClient` never accepted `writeMetadata` (it has no on-disk format to begin with), so there is nothing to change there.

## Multiple crawler instances use separate default request queues

In v3, every `BasicCrawler` (or subclass) that didn't receive an explicit `requestQueue` option would open the same default request queue. If you created two crawlers in the same process, they would silently share a queue — leading to request collisions and hard-to-debug deduplication issues.
Expand Down
7 changes: 2 additions & 5 deletions packages/fs-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,11 @@
"access": "public"
},
"dependencies": {
"@crawlee/fs-storage-native": "0.1.5-beta.0",
"@crawlee/types": "workspace:*",
"@sapphire/async-queue": "^1.5.5",
"@sapphire/shapeshift": "^4.0.0",
"content-type": "^1.0.5",
"fs-extra": "^11.3.0",
"json5": "^2.2.3",
"mime-types": "^3.0.1",
"proper-lockfile": "^4.1.2",
"tslib": "^2.8.1"
"mime-types": "^3.0.1"
}
}
89 changes: 0 additions & 89 deletions packages/fs-storage/src/background-handler/fs-utils.ts

This file was deleted.

51 changes: 0 additions & 51 deletions packages/fs-storage/src/background-handler/index.ts

This file was deleted.

Loading
Loading