Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 67 additions & 58 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from apify._proxy_configuration import ProxyConfiguration
from apify._utils import docs_group, docs_name, ensure_context, get_system_info, is_running_in_ipython
from apify._webhook import to_client_representations
from apify.errors import map_client_errors
from apify.events import ApifyEventManager, EventManager, LocalEventManager
from apify.log import _configure_logging, logger
from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient
Expand Down Expand Up @@ -936,17 +937,18 @@ async def start(
raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"inherit"`, or a `timedelta`.')

actor_client = client.actor(actor_id)
return await actor_client.start(
run_input=run_input,
content_type=content_type,
build=build,
max_total_charge_usd=max_total_charge_usd,
restart_on_error=restart_on_error,
memory_mbytes=memory_mbytes,
run_timeout=actor_start_timeout,
force_permission_level=force_permission_level,
webhooks=to_client_representations(webhooks),
)
with map_client_errors():
return await actor_client.start(
run_input=run_input,
content_type=content_type,
build=build,
max_total_charge_usd=max_total_charge_usd,
restart_on_error=restart_on_error,
memory_mbytes=memory_mbytes,
run_timeout=actor_start_timeout,
force_permission_level=force_permission_level,
webhooks=to_client_representations(webhooks),
)

@_ensure_context
async def abort(
Expand Down Expand Up @@ -975,10 +977,11 @@ async def abort(
client = self.new_client(token=token) if token else self.apify_client
run_client = client.run(run_id)

if status_message:
await run_client.update(status_message=status_message)
with map_client_errors():
if status_message:
await run_client.update(status_message=status_message)

run = await run_client.abort(gracefully=gracefully)
run = await run_client.abort(gracefully=gracefully)

if run is None:
raise RuntimeError(f'Failed to abort Actor run with ID "{run_id}".')
Expand Down Expand Up @@ -1047,19 +1050,20 @@ async def call(
raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"inherit"`, or a `timedelta`.')

actor_client = client.actor(actor_id)
run = await actor_client.call(
run_input=run_input,
content_type=content_type,
build=build,
max_total_charge_usd=max_total_charge_usd,
restart_on_error=restart_on_error,
memory_mbytes=memory_mbytes,
run_timeout=actor_call_timeout,
force_permission_level=force_permission_level,
webhooks=to_client_representations(webhooks),
wait_duration=wait,
logger=logger,
)
with map_client_errors():
run = await actor_client.call(
run_input=run_input,
content_type=content_type,
build=build,
max_total_charge_usd=max_total_charge_usd,
restart_on_error=restart_on_error,
memory_mbytes=memory_mbytes,
run_timeout=actor_call_timeout,
force_permission_level=force_permission_level,
webhooks=to_client_representations(webhooks),
wait_duration=wait,
logger=logger,
)

if run is None:
raise RuntimeError(f'Failed to call Actor with ID "{actor_id}".')
Expand Down Expand Up @@ -1120,15 +1124,16 @@ async def call_task(
raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"inherit"`, or a `timedelta`.')

task_client = client.task(task_id)
run = await task_client.call(
task_input=task_input,
build=build,
restart_on_error=restart_on_error,
memory_mbytes=memory_mbytes,
run_timeout=task_call_timeout,
webhooks=to_client_representations(webhooks),
wait_duration=wait,
)
with map_client_errors():
run = await task_client.call(
task_input=task_input,
build=build,
restart_on_error=restart_on_error,
memory_mbytes=memory_mbytes,
run_timeout=task_call_timeout,
webhooks=to_client_representations(webhooks),
wait_duration=wait,
)

if run is None:
raise RuntimeError(f'Failed to call Task with ID "{task_id}".')
Expand Down Expand Up @@ -1171,12 +1176,13 @@ async def metamorph(
if not self.configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

await self.apify_client.run(self.configuration.actor_run_id).metamorph(
target_actor_id=target_actor_id,
run_input=run_input,
target_actor_build=target_actor_build,
content_type=content_type,
)
with map_client_errors():
await self.apify_client.run(self.configuration.actor_run_id).metamorph(
target_actor_id=target_actor_id,
run_input=run_input,
target_actor_build=target_actor_build,
content_type=content_type,
)

if custom_after_sleep:
await asyncio.sleep(custom_after_sleep.total_seconds())
Expand Down Expand Up @@ -1242,7 +1248,8 @@ async def safe_dispatch(listener: Any, data: Any) -> None:
except TimeoutError:
self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot')

await self.apify_client.run(self.configuration.actor_run_id).reboot()
with map_client_errors():
await self.apify_client.run(self.configuration.actor_run_id).reboot()
except BaseException:
# Reset the flag so that a failed or cancelled reboot can be retried.
self._is_rebooting = False
Expand Down Expand Up @@ -1283,17 +1290,18 @@ async def add_webhook(self, webhook: Webhook, *, idempotency_key: str | None = N
if not self.configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

await self.apify_client.webhooks().create(
actor_run_id=self.configuration.actor_run_id,
event_types=webhook.event_types,
request_url=webhook.request_url,
payload_template=webhook.payload_template,
headers_template=webhook.headers_template,
ignore_ssl_errors=webhook.ignore_ssl_errors,
do_not_retry=webhook.do_not_retry,
idempotency_key=idempotency_key if idempotency_key is not None else webhook.idempotency_key,
is_ad_hoc=True,
)
with map_client_errors():
await self.apify_client.webhooks().create(
actor_run_id=self.configuration.actor_run_id,
event_types=webhook.event_types,
request_url=webhook.request_url,
payload_template=webhook.payload_template,
headers_template=webhook.headers_template,
ignore_ssl_errors=webhook.ignore_ssl_errors,
do_not_retry=webhook.do_not_retry,
idempotency_key=idempotency_key if idempotency_key is not None else webhook.idempotency_key,
is_ad_hoc=True,
)

@_ensure_context
async def set_status_message(
Expand Down Expand Up @@ -1321,10 +1329,11 @@ async def set_status_message(
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

run_client = self.apify_client.run(self.configuration.actor_run_id)
run = await run_client.update(
status_message=status_message,
is_status_message_terminal=is_terminal,
)
with map_client_errors():
run = await run_client.update(
status_message=status_message,
is_status_message_terminal=is_terminal,
)

if run is None:
raise RuntimeError(
Expand Down
7 changes: 5 additions & 2 deletions src/apify/_charging.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from apify_client._models import PricingPerEvent as ClientPricingPerEvent

from apify._utils import ReentrantLock, docs_group, ensure_context
from apify.errors import map_client_errors
from apify.log import logger
from apify.storages import Dataset

Expand Down Expand Up @@ -449,7 +450,8 @@ async def charge(self, event_name: str, *, count: int = 1) -> ChargeResult:
# the platform handles them automatically based on dataset writes.
pass
elif event_name in self._pricing_info:
await self._client.run(self._actor_run_id).charge(event_name, count=charged_count)
with map_client_errors():
await self._client.run(self._actor_run_id).charge(event_name, count=charged_count)
elif event_name in self._tier_priced_events:
logger.warning(
f"Event '{event_name}' is tier-priced and is not chargeable via the pay-per-event API."
Expand Down Expand Up @@ -572,7 +574,8 @@ async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict:
if self._actor_run_id is None:
raise RuntimeError('Actor run ID not found even though the Actor is running on Apify')

run = await self._client.run(self._actor_run_id).get()
with map_client_errors():
run = await self._client.run(self._actor_run_id).get()

if run is None:
raise RuntimeError('Actor run not found')
Expand Down
1 change: 1 addition & 0 deletions src/apify/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def is_running_in_ipython() -> bool:
'Actor',
'Charging',
'Configuration',
'Errors',
'Event data',
'Event managers',
'Events',
Expand Down
Loading