diff --git a/core/src/main/java/dev/vml/es/acm/core/code/Locker.java b/core/src/main/java/dev/vml/es/acm/core/code/Locker.java index 7a6e96cf..4c113693 100644 --- a/core/src/main/java/dev/vml/es/acm/core/code/Locker.java +++ b/core/src/main/java/dev/vml/es/acm/core/code/Locker.java @@ -131,4 +131,28 @@ private boolean isLock(Resource lock) { public boolean anyLocked() { return locks().findAny().isPresent(); } + + public boolean isLockStale(String lockName, long timeoutMillis) { + Calendar lockTime = getLockTime(lockName); + if (lockTime == null) { + return true; // non-existent as stale + } + + Calendar now = Calendar.getInstance(); + long diff = (now.getTimeInMillis() - lockTime.getTimeInMillis()); + + boolean isStale = diff> timeoutMillis; + if (isStale) { + LOG.debug("Lock '{}' is {} millis old, considering it stale (timeout: {} millis)", lockName, diff, timeoutMillis); + } + return isStale; + } + + private Calendar getLockTime(String lockName) { + Resource lock = getLock(lockName); + if (lock == null) { + return null; + } + return lock.getValueMap().get(LOCKED_PROP, Calendar.class); + } } diff --git a/core/src/main/java/dev/vml/es/acm/core/script/ScriptScheduler.java b/core/src/main/java/dev/vml/es/acm/core/script/ScriptScheduler.java index e7c68474..ac585603 100644 --- a/core/src/main/java/dev/vml/es/acm/core/script/ScriptScheduler.java +++ b/core/src/main/java/dev/vml/es/acm/core/script/ScriptScheduler.java @@ -25,7 +25,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.ResourceResolver; @@ -67,6 +66,8 @@ public class ScriptScheduler implements ResourceChangeListener, EventListener, J public static final String JOB_PROP_SCRIPT_PATH = "scriptPath"; + private static final String SCHEDULER_LOCK_NAME = "script-scheduler-leader"; + public enum JobType { BOOT, CRON; @@ -86,7 +87,7 @@ public static JobType of(String value) { public @interface Config { @AttributeDefinition(name = "Boot Delay", description = "Time in milliseconds to wait before booting scripts") - long bootDelay() default 1000 * 10; // 10 seconds + long bootDelay() default 1000; // 1 second @AttributeDefinition( name = "User Impersonation ID", @@ -99,11 +100,6 @@ public static JobType of(String value) { description = "Interval in milliseconds to retry health check if instance is not healthy") long healthCheckRetryInterval() default 1000 * 10; // 10 seconds - @AttributeDefinition( - name = "Health Check Retry Count On Deployment", - description = "Maximum number of retries when checking instance health on deployment") - long healthCheckRetryCountDeployment() default 90; // * 10 seconds = 15 minutes - @AttributeDefinition( name = "Health Check Retry Count On Boot", description = "Maximum number of retries when checking instance health on boot script execution") @@ -114,6 +110,12 @@ public static JobType of(String value) { description = "Maximum number of retries when checking instance health on cron schedule script execution") long healthCheckRetryCountCron() default 3; // * 10 seconds = 30 seconds + + @AttributeDefinition( + name = "Leader Lock Stale Timeout", + description = "Timeout in milliseconds after which the leader lock is considered stale (only on AEMaaCS)" + ) + long leaderLockStaleTimeout() default 1000 * 60 * 30; // 30 minutes } private Boolean instanceReady; @@ -147,8 +149,7 @@ protected void activate(Config config) { this.config = config; if (checkInstanceReady()) { - deployJobExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, DEPLOY_THREAD_NAME)); - deployJobExecutor.execute(this::deployJob); + bootWhenInstanceUp(); } } @@ -163,8 +164,14 @@ protected void deactivate() { deployJobExecutor.shutdownNow(); deployJobExecutor = null; } - unscheduleBoot(); - unscheduleScripts(); + boolean wasLeader = tryUnlock(); + if (wasLeader) { + LOG.info("Instance was scheduler leader - unscheduling jobs"); + unscheduleBoot(); + unscheduleScripts(); + } else { + LOG.info("Instance was not scheduler leader - keeping jobs"); + } bootedScripts.clear(); instanceReady = null; } @@ -185,21 +192,42 @@ public void onEvent(Event event) { } public void bootOnDemand() { - LOG.info("Automatic scripts booting on demand - job scheduling"); + LOG.info("Automatic scripts booting on demand - checking leadership"); + + if (!tryLock()) { + LOG.info("Instance is not scheduler leader - skipping job scheduling"); + return; + } + + LOG.info("Instance is scheduler leader - proceeding with job scheduling"); unscheduleBoot(); scheduleBoot(); LOG.info("Automatic scripts booting on demand - job scheduled"); } private void bootWhenInstanceUp() { - LOG.info("Automatic scripts booting on instance up - job scheduling"); + LOG.info("Automatic scripts booting on instance up - checking leadership"); + + if (!tryLock()) { + LOG.info("Instance is not scheduler leader - skipping job scheduling"); + return; + } + + LOG.info("Instance is scheduler leader - proceeding with job scheduling"); unscheduleBoot(); scheduleBoot(); LOG.info("Automatic scripts booting on instance up - job scheduled"); } private void bootWhenScriptsChanged() { - LOG.info("Automatic scripts booting on script changes - job scheduling"); + LOG.info("Automatic scripts booting on script changes - checking leadership"); + + if (!tryLock()) { + LOG.info("Instance is not scheduler leader - skipping job scheduling"); + return; + } + + LOG.info("Instance is scheduler leader - proceeding with job scheduling"); unscheduleBoot(); scheduleBoot(); LOG.info("Automatic scripts booting on script changes - job scheduled"); @@ -248,16 +276,6 @@ private ScheduleResult determineSchedule(Script script, ResourceResolver resourc } } - // Sling scheduler does not work during deployment on AEMaaCS, so we need to postpone boot job - private void deployJob() { - LOG.info("Instance deployment - job started"); - if (awaitInstanceHealthy( - "Instance deployment", config.healthCheckRetryCountDeployment(), config.healthCheckRetryInterval())) { - bootWhenInstanceUp(); - } - LOG.info("Instance deployment - job finished"); - } - private void bootJob() { LOG.info("Automatic scripts booting - job started"); unscheduleScripts(); @@ -465,4 +483,53 @@ private boolean awaitInstanceHealthy(String operation, long retryMaxCount, long } return true; } + + private boolean tryLock() { + if (!instanceInfo.isCluster()) { + return true; + } + try (ResourceResolver resourceResolver = ResolverUtils.contentResolver(resourceResolverFactory, null)) { + Locker locker = new Locker(resourceResolver); + if (locker.isLocked(SCHEDULER_LOCK_NAME)) { + if (locker.isLockStale(SCHEDULER_LOCK_NAME, config.leaderLockStaleTimeout())) { + LOG.warn("Instance removing stale leader lock"); + locker.unlock(SCHEDULER_LOCK_NAME); + } else { + return false; // other cluster node is the leader + } + } + try { + locker.lock(SCHEDULER_LOCK_NAME); + LOG.info("Instance is now scheduler leader"); + return true; + } catch (Exception e) { + LOG.warn("Instance cannot become scheduler leader", e); + return false; + } + } catch (LoginException e) { + LOG.error("Cannot access repository while checking scheduler leadership!", e); + return false; + } + } + + private boolean tryUnlock() { + if (!instanceInfo.isCluster()) { + return true; + } + try (ResourceResolver resourceResolver = ResolverUtils.contentResolver(resourceResolverFactory, null)) { + Locker locker = new Locker(resourceResolver); + if (locker.isLocked(SCHEDULER_LOCK_NAME)) { + locker.unlock(SCHEDULER_LOCK_NAME); + LOG.info("Instance is no longer scheduler leader"); + return true; + } + return false; // other cluster node is the leader + } catch (LoginException e) { + LOG.error("Instance cannot access repository while releasing scheduler leadership!", e); + return false; + } catch (Exception e) { + LOG.warn("Instance failed to release scheduler leadership lock", e); + return false; + } + } }

AltStyle によって変換されたページ (->オリジナル) /