001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.scheduler; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.atomic.AtomicBoolean; 028 029import javax.jms.MessageFormatException; 030 031import org.apache.activemq.broker.scheduler.CronParser; 032import org.apache.activemq.broker.scheduler.Job; 033import org.apache.activemq.broker.scheduler.JobListener; 034import org.apache.activemq.broker.scheduler.JobScheduler; 035import org.apache.activemq.protobuf.Buffer; 036import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; 037import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; 038import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; 039import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; 040import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 041import org.apache.activemq.store.kahadb.disk.journal.Location; 042import org.apache.activemq.store.kahadb.disk.page.Transaction; 043import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 044import org.apache.activemq.util.ByteSequence; 045import org.apache.activemq.util.IdGenerator; 046import org.apache.activemq.util.ServiceStopper; 047import org.apache.activemq.util.ServiceSupport; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler { 052 053 private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class); 054 private final JobSchedulerStoreImpl store; 055 private final AtomicBoolean running = new AtomicBoolean(); 056 private String name; 057 private BTreeIndex<Long, List<JobLocation>> index; 058 private Thread thread; 059 private final AtomicBoolean started = new AtomicBoolean(false); 060 private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>(); 061 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 062 private final ScheduleTime scheduleTime = new ScheduleTime(); 063 064 JobSchedulerImpl(JobSchedulerStoreImpl store) { 065 this.store = store; 066 } 067 068 public void setName(String name) { 069 this.name = name; 070 } 071 072 @Override 073 public String getName() { 074 return this.name; 075 } 076 077 @Override 078 public void addListener(JobListener l) { 079 this.jobListeners.add(l); 080 } 081 082 @Override 083 public void removeListener(JobListener l) { 084 this.jobListeners.remove(l); 085 } 086 087 @Override 088 public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException { 089 doSchedule(jobId, payload, "", 0, delay, 0); 090 } 091 092 @Override 093 public void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception { 094 doSchedule(jobId, payload, cronEntry, 0, 0, 0); 095 } 096 097 @Override 098 public void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period, final int repeat) throws IOException { 099 doSchedule(jobId, payload, cronEntry, delay, period, repeat); 100 } 101 102 @Override 103 public void remove(final long time) throws IOException { 104 doRemoveRange(time, time); 105 } 106 107 @Override 108 public void remove(final String jobId) throws IOException { 109 doRemove(-1, jobId); 110 } 111 112 @Override 113 public void removeAllJobs() throws IOException { 114 doRemoveRange(0, Long.MAX_VALUE); 115 } 116 117 @Override 118 public void removeAllJobs(final long start, final long finish) throws IOException { 119 doRemoveRange(start, finish); 120 } 121 122 @Override 123 public long getNextScheduleTime() throws IOException { 124 this.store.readLockIndex(); 125 try { 126 Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); 127 return first != null ? first.getKey() : -1l; 128 } finally { 129 this.store.readUnlockIndex(); 130 } 131 } 132 133 @Override 134 public List<Job> getNextScheduleJobs() throws IOException { 135 final List<Job> result = new ArrayList<>(); 136 this.store.readLockIndex(); 137 try { 138 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 139 @Override 140 public void execute(Transaction tx) throws IOException { 141 Map.Entry<Long, List<JobLocation>> first = index.getFirst(tx); 142 if (first != null) { 143 for (JobLocation jl : first.getValue()) { 144 ByteSequence bs = getPayload(jl.getLocation()); 145 Job job = new JobImpl(jl, bs); 146 result.add(job); 147 } 148 } 149 } 150 }); 151 } finally { 152 this.store.readUnlockIndex(); 153 } 154 return result; 155 } 156 157 private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException { 158 this.store.readLockIndex(); 159 try { 160 if (!this.store.isStopped() && !this.store.isStopping()) { 161 Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); 162 return first; 163 } 164 } finally { 165 this.store.readUnlockIndex(); 166 } 167 return null; 168 } 169 170 @Override 171 public List<Job> getAllJobs() throws IOException { 172 final List<Job> result = new ArrayList<>(); 173 this.store.readLockIndex(); 174 try { 175 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 176 @Override 177 public void execute(Transaction tx) throws IOException { 178 Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx()); 179 while (iter.hasNext()) { 180 Map.Entry<Long, List<JobLocation>> next = iter.next(); 181 if (next != null) { 182 for (JobLocation jl : next.getValue()) { 183 ByteSequence bs = getPayload(jl.getLocation()); 184 Job job = new JobImpl(jl, bs); 185 result.add(job); 186 } 187 } else { 188 break; 189 } 190 } 191 } 192 }); 193 } finally { 194 this.store.readUnlockIndex(); 195 } 196 return result; 197 } 198 199 @Override 200 public List<Job> getAllJobs(final long start, final long finish) throws IOException { 201 final List<Job> result = new ArrayList<>(); 202 this.store.readLockIndex(); 203 try { 204 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 205 @Override 206 public void execute(Transaction tx) throws IOException { 207 Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(tx, start); 208 while (iter.hasNext()) { 209 Map.Entry<Long, List<JobLocation>> next = iter.next(); 210 if (next != null && next.getKey().longValue() <= finish) { 211 for (JobLocation jl : next.getValue()) { 212 ByteSequence bs = getPayload(jl.getLocation()); 213 Job job = new JobImpl(jl, bs); 214 result.add(job); 215 } 216 } else { 217 break; 218 } 219 } 220 } 221 }); 222 } finally { 223 this.store.readUnlockIndex(); 224 } 225 return result; 226 } 227 228 private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException { 229 long startTime = System.currentTimeMillis(); 230 // round startTime - so we can schedule more jobs at the same time 231 startTime = ((startTime + 500) / 500) * 500; 232 233 long time = 0; 234 if (cronEntry != null && cronEntry.length() > 0) { 235 try { 236 time = CronParser.getNextScheduledTime(cronEntry, startTime); 237 } catch (MessageFormatException e) { 238 throw new IOException(e.getMessage()); 239 } 240 } 241 242 if (time == 0) { 243 // start time not set by CRON - so it it to the current time 244 time = startTime; 245 } 246 247 if (delay > 0) { 248 time += delay; 249 } else { 250 time += period; 251 } 252 253 KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand(); 254 newJob.setScheduler(name); 255 newJob.setJobId(jobId); 256 newJob.setStartTime(startTime); 257 newJob.setCronEntry(cronEntry); 258 newJob.setDelay(delay); 259 newJob.setPeriod(period); 260 newJob.setRepeat(repeat); 261 newJob.setNextExecutionTime(time); 262 newJob.setPayload(new Buffer(payload.getData(), payload.getOffset(), payload.getLength())); 263 264 this.store.store(newJob); 265 } 266 267 private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException { 268 KahaRescheduleJobCommand update = new KahaRescheduleJobCommand(); 269 update.setScheduler(name); 270 update.setJobId(jobId); 271 update.setExecutionTime(executionTime); 272 update.setNextExecutionTime(nextExecutionTime); 273 update.setRescheduledCount(rescheduledCount); 274 this.store.store(update); 275 } 276 277 private void doRemove(final long executionTime, final List<JobLocation> jobs) throws IOException { 278 for (JobLocation job : jobs) { 279 doRemove(executionTime, job.getJobId()); 280 } 281 } 282 283 private void doRemove(long executionTime, final String jobId) throws IOException { 284 KahaRemoveScheduledJobCommand remove = new KahaRemoveScheduledJobCommand(); 285 remove.setScheduler(name); 286 remove.setJobId(jobId); 287 remove.setNextExecutionTime(executionTime); 288 this.store.store(remove); 289 } 290 291 private void doRemoveRange(long start, long end) throws IOException { 292 KahaRemoveScheduledJobsCommand destroy = new KahaRemoveScheduledJobsCommand(); 293 destroy.setScheduler(name); 294 destroy.setStartTime(start); 295 destroy.setEndTime(end); 296 this.store.store(destroy); 297 } 298 299 /** 300 * Adds a new Scheduled job to the index. Must be called under index lock. 301 * 302 * This method must ensure that a duplicate add is not processed into the scheduler. On index 303 * recover some adds may be replayed and we don't allow more than one instance of a JobId to 304 * exist at any given scheduled time, so filter these out to ensure idempotence. 305 * 306 * @param tx 307 * Transaction in which the update is performed. 308 * @param command 309 * The new scheduled job command to process. 310 * @param location 311 * The location where the add command is stored in the journal. 312 * 313 * @throws IOException if an error occurs updating the index. 314 */ 315 protected void process(final Transaction tx, final KahaAddScheduledJobCommand command, Location location) throws IOException { 316 JobLocation jobLocation = new JobLocation(location); 317 jobLocation.setJobId(command.getJobId()); 318 jobLocation.setStartTime(command.getStartTime()); 319 jobLocation.setCronEntry(command.getCronEntry()); 320 jobLocation.setDelay(command.getDelay()); 321 jobLocation.setPeriod(command.getPeriod()); 322 jobLocation.setRepeat(command.getRepeat()); 323 324 long nextExecutionTime = command.getNextExecutionTime(); 325 326 List<JobLocation> values = null; 327 jobLocation.setNextTime(nextExecutionTime); 328 if (this.index.containsKey(tx, nextExecutionTime)) { 329 values = this.index.remove(tx, nextExecutionTime); 330 } 331 if (values == null) { 332 values = new ArrayList<>(); 333 } 334 335 // There can never be more than one instance of the same JobId scheduled at any 336 // given time, when it happens its probably the result of index recovery and this 337 // method must be idempotent so check for it first. 338 if (!values.contains(jobLocation)) { 339 values.add(jobLocation); 340 341 // Reference the log file where the add command is stored to prevent GC. 342 this.store.incrementJournalCount(tx, location); 343 this.index.put(tx, nextExecutionTime, values); 344 this.scheduleTime.newJob(); 345 } else { 346 this.index.put(tx, nextExecutionTime, values); 347 LOG.trace("Job {} already in scheduler at this time {}", 348 jobLocation.getJobId(), jobLocation.getNextTime()); 349 } 350 } 351 352 /** 353 * Reschedules a Job after it has be fired. 354 * 355 * For jobs that are repeating this method updates the job in the index by adding it to the 356 * jobs list for the new execution time. If the job is not a cron type job then this method 357 * will reduce the repeat counter if the job has a fixed number of repeats set. The Job will 358 * be removed from the jobs list it just executed on. 359 * 360 * This method must also update the value of the last update location in the JobLocation 361 * instance so that the checkpoint worker doesn't drop the log file in which that command lives. 362 * 363 * This method must ensure that an reschedule command that references a job that doesn't exist 364 * does not cause an error since it's possible that on recover the original add might be gone 365 * and so the job should not reappear in the scheduler. 366 * 367 * @param tx 368 * The TX under which the index is updated. 369 * @param command 370 * The reschedule command to process. 371 * @param location 372 * The location in the index where the reschedule command was stored. 373 * 374 * @throws IOException if an error occurs during the reschedule. 375 */ 376 protected void process(final Transaction tx, final KahaRescheduleJobCommand command, Location location) throws IOException { 377 JobLocation result = null; 378 final List<JobLocation> current = this.index.remove(tx, command.getExecutionTime()); 379 if (current != null) { 380 for (int i = 0; i < current.size(); i++) { 381 JobLocation jl = current.get(i); 382 if (jl.getJobId().equals(command.getJobId())) { 383 current.remove(i); 384 if (!current.isEmpty()) { 385 this.index.put(tx, command.getExecutionTime(), current); 386 } 387 result = jl; 388 break; 389 } 390 } 391 } else { 392 LOG.debug("Process reschedule command for job {} non-existent executime time {}.", 393 command.getJobId(), command.getExecutionTime()); 394 } 395 396 if (result != null) { 397 Location previousUpdate = result.getLastUpdate(); 398 399 List<JobLocation> target = null; 400 result.setNextTime(command.getNextExecutionTime()); 401 result.setLastUpdate(location); 402 result.setRescheduledCount(command.getRescheduledCount()); 403 if (!result.isCron() && result.getRepeat() > 0) { 404 result.setRepeat(result.getRepeat() - 1); 405 } 406 if (this.index.containsKey(tx, command.getNextExecutionTime())) { 407 target = this.index.remove(tx, command.getNextExecutionTime()); 408 } 409 if (target == null) { 410 target = new ArrayList<>(); 411 } 412 target.add(result); 413 414 // Track the location of the last reschedule command and release the log file 415 // reference for the previous one if there was one. 416 this.store.incrementJournalCount(tx, location); 417 if (previousUpdate != null) { 418 this.store.decrementJournalCount(tx, previousUpdate); 419 } 420 421 this.index.put(tx, command.getNextExecutionTime(), target); 422 this.scheduleTime.newJob(); 423 } else { 424 LOG.debug("Process reschedule command for non-scheduled job {} at executime time {}.", 425 command.getJobId(), command.getExecutionTime()); 426 } 427 } 428 429 /** 430 * Removes a scheduled job from the scheduler. 431 * 432 * The remove operation can be of two forms. The first is that there is a job Id but no set time 433 * (-1) in which case the jobs index is searched until the target job Id is located. The alternate 434 * form is that a job Id and execution time are both set in which case the given time is checked 435 * for a job matching that Id. In either case once an execution time is identified the job is 436 * removed and the index updated. 437 * 438 * This method should ensure that if the matching job is not found that no error results as it 439 * is possible that on a recover the initial add command could be lost so the job may not be 440 * rescheduled. 441 * 442 * @param tx 443 * The transaction under which the index is updated. 444 * @param command 445 * The remove command to process. 446 * @param location 447 * The location of the remove command in the Journal. 448 * 449 * @throws IOException if an error occurs while updating the scheduler index. 450 */ 451 void process(final Transaction tx, final KahaRemoveScheduledJobCommand command, Location location) throws IOException { 452 453 // Case 1: JobId and no time value means find the job and remove it. 454 // Case 2: JobId and a time value means find exactly this scheduled job. 455 456 Long executionTime = command.getNextExecutionTime(); 457 458 List<JobLocation> values = null; 459 460 if (executionTime == -1) { 461 for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { 462 Map.Entry<Long, List<JobLocation>> entry = i.next(); 463 List<JobLocation> candidates = entry.getValue(); 464 if (candidates != null) { 465 for (JobLocation jl : candidates) { 466 if (jl.getJobId().equals(command.getJobId())) { 467 LOG.trace("Entry {} contains the remove target: {}", entry.getKey(), command.getJobId()); 468 executionTime = entry.getKey(); 469 values = this.index.remove(tx, executionTime); 470 break; 471 } 472 } 473 } 474 } 475 } else { 476 values = this.index.remove(tx, executionTime); 477 } 478 479 JobLocation removed = null; 480 481 // Remove the job and update the index if there are any other jobs scheduled at this time. 482 if (values != null) { 483 for (JobLocation job : values) { 484 if (job.getJobId().equals(command.getJobId())) { 485 removed = job; 486 values.remove(removed); 487 break; 488 } 489 } 490 491 if (!values.isEmpty()) { 492 this.index.put(tx, executionTime, values); 493 } 494 } 495 496 if (removed != null) { 497 LOG.trace("{} removed from scheduler {}", removed, this); 498 499 // Remove the references for add and reschedule commands for this job 500 // so that those logs can be GC'd when free. 501 this.store.decrementJournalCount(tx, removed.getLocation()); 502 if (removed.getLastUpdate() != null) { 503 this.store.decrementJournalCount(tx, removed.getLastUpdate()); 504 } 505 506 // now that the job is removed from the index we can store the remove info and 507 // then dereference the log files that hold the initial add command and the most 508 // recent update command. If the remove and the add that created the job are in 509 // the same file we don't need to track it and just let a normal GC of the logs 510 // remove it when the log is unreferenced. 511 if (removed.getLocation().getDataFileId() != location.getDataFileId()) { 512 this.store.referenceRemovedLocation(tx, location, removed); 513 } 514 } 515 } 516 517 /** 518 * Removes all scheduled jobs within a given time range. 519 * 520 * The method can be used to clear the entire scheduler index by specifying a range that 521 * encompasses all time [0...Long.MAX_VALUE] or a single execution time can be removed by 522 * setting start and end time to the same value. 523 * 524 * @param tx 525 * The transaction under which the index is updated. 526 * @param command 527 * The remove command to process. 528 * @param location 529 * The location of the remove command in the Journal. 530 * 531 * @throws IOException if an error occurs while updating the scheduler index. 532 */ 533 protected void process(final Transaction tx, final KahaRemoveScheduledJobsCommand command, Location location) throws IOException { 534 removeInRange(tx, command.getStartTime(), command.getEndTime(), location); 535 } 536 537 /** 538 * Removes all jobs from the schedulers index. Must be called with the index locked. 539 * 540 * @param tx 541 * The transaction under which the index entries for this scheduler are removed. 542 * 543 * @throws IOException if an error occurs removing the jobs from the scheduler index. 544 */ 545 protected void removeAll(Transaction tx) throws IOException { 546 this.removeInRange(tx, 0, Long.MAX_VALUE, null); 547 } 548 549 /** 550 * Removes all scheduled jobs within the target range. 551 * 552 * This method can be used to remove all the stored jobs by passing a range of [0...Long.MAX_VALUE] 553 * or it can be used to remove all jobs at a given scheduled time by passing the same time value 554 * for both start and end. If the optional location parameter is set then this method will update 555 * the store's remove location tracker with the location value and the Jobs that are being removed. 556 * 557 * This method must be called with the store index locked for writes. 558 * 559 * @param tx 560 * The transaction under which the index is to be updated. 561 * @param start 562 * The start time for the remove operation. 563 * @param finish 564 * The end time for the remove operation. 565 * @param location (optional) 566 * The location of the remove command that triggered this remove. 567 * 568 * @throws IOException if an error occurs during the remove operation. 569 */ 570 protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException { 571 List<Long> keys = new ArrayList<>(); 572 for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) { 573 Map.Entry<Long, List<JobLocation>> entry = i.next(); 574 if (entry.getKey().longValue() <= finish) { 575 keys.add(entry.getKey()); 576 } else { 577 break; 578 } 579 } 580 581 for (Long executionTime : keys) { 582 List<JobLocation> values = this.index.remove(tx, executionTime); 583 if (location != null) { 584 for (JobLocation job : values) { 585 LOG.trace("Removing {} scheduled at: {}", job, executionTime); 586 587 // Remove the references for add and reschedule commands for this job 588 // so that those logs can be GC'd when free. 589 this.store.decrementJournalCount(tx, job.getLocation()); 590 if (job.getLastUpdate() != null) { 591 this.store.decrementJournalCount(tx, job.getLastUpdate()); 592 } 593 594 // now that the job is removed from the index we can store the remove info and 595 // then dereference the log files that hold the initial add command and the most 596 // recent update command. If the remove and the add that created the job are in 597 // the same file we don't need to track it and just let a normal GC of the logs 598 // remove it when the log is unreferenced. 599 if (job.getLocation().getDataFileId() != location.getDataFileId()) { 600 this.store.referenceRemovedLocation(tx, location, job); 601 } 602 } 603 } 604 } 605 } 606 607 /** 608 * Removes a Job from the index using it's Id value and the time it is currently set to 609 * be executed. This method will only remove the Job if it is found at the given execution 610 * time. 611 * 612 * This method must be called under index lock. 613 * 614 * @param tx 615 * the transaction under which this method is being executed. 616 * @param jobId 617 * the target Job Id to remove. 618 * @param executionTime 619 * the scheduled time that for the Job Id that is being removed. 620 * 621 * @returns true if the Job was removed or false if not found at the given time. 622 * 623 * @throws IOException if an error occurs while removing the Job. 624 */ 625 protected boolean removeJobAtTime(Transaction tx, String jobId, long executionTime) throws IOException { 626 boolean result = false; 627 628 List<JobLocation> jobs = this.index.remove(tx, executionTime); 629 Iterator<JobLocation> jobsIter = jobs.iterator(); 630 while (jobsIter.hasNext()) { 631 JobLocation job = jobsIter.next(); 632 if (job.getJobId().equals(jobId)) { 633 jobsIter.remove(); 634 // Remove the references for add and reschedule commands for this job 635 // so that those logs can be GC'd when free. 636 this.store.decrementJournalCount(tx, job.getLocation()); 637 if (job.getLastUpdate() != null) { 638 this.store.decrementJournalCount(tx, job.getLastUpdate()); 639 } 640 result = true; 641 break; 642 } 643 } 644 645 // Return the list to the index modified or unmodified. 646 this.index.put(tx, executionTime, jobs); 647 648 return result; 649 } 650 651 /** 652 * Walks the Scheduled Job Tree and collects the add location and last update location 653 * for all scheduled jobs. 654 * 655 * This method must be called with the index locked. 656 * 657 * @param tx 658 * the transaction under which this operation was invoked. 659 * 660 * @return a list of all referenced Location values for this JobSchedulerImpl 661 * 662 * @throws IOException if an error occurs walking the scheduler tree. 663 */ 664 protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException { 665 List<JobLocation> references = new ArrayList<>(); 666 667 for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { 668 Map.Entry<Long, List<JobLocation>> entry = i.next(); 669 List<JobLocation> scheduled = entry.getValue(); 670 for (JobLocation job : scheduled) { 671 references.add(job); 672 } 673 } 674 675 return references; 676 } 677 678 @Override 679 public void run() { 680 try { 681 mainLoop(); 682 } catch (Throwable e) { 683 if (this.running.get() && isStarted()) { 684 LOG.error("{} Caught exception in mainloop", this, e); 685 } 686 } finally { 687 if (running.get()) { 688 try { 689 stop(); 690 } catch (Exception e) { 691 LOG.error("Failed to stop {}", this); 692 } 693 } 694 } 695 } 696 697 @Override 698 public String toString() { 699 return "JobScheduler: " + this.name; 700 } 701 702 protected void mainLoop() { 703 while (this.running.get()) { 704 this.scheduleTime.clearNewJob(); 705 try { 706 long currentTime = System.currentTimeMillis(); 707 708 // Read the list of scheduled events and fire the jobs, reschedule repeating jobs as 709 // needed before firing the job event. 710 Map.Entry<Long, List<JobLocation>> first = getNextToSchedule(); 711 if (first != null) { 712 List<JobLocation> list = new ArrayList<>(first.getValue()); 713 List<JobLocation> toRemove = new ArrayList<>(list.size()); 714 final long executionTime = first.getKey(); 715 long nextExecutionTime = 0; 716 if (executionTime <= currentTime) { 717 for (final JobLocation job : list) { 718 719 if (!running.get()) { 720 break; 721 } 722 723 int repeat = job.getRepeat(); 724 nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); 725 long waitTime = nextExecutionTime - currentTime; 726 this.scheduleTime.setWaitTime(waitTime); 727 if (!job.isCron()) { 728 fireJob(job); 729 if (repeat != 0) { 730 // Reschedule for the next time, the scheduler will take care of 731 // updating the repeat counter on the update. 732 doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); 733 } else { 734 toRemove.add(job); 735 } 736 } else { 737 if (repeat == 0) { 738 // This is a non-repeating Cron entry so we can fire and forget it. 739 fireJob(job); 740 } 741 742 if (nextExecutionTime > currentTime) { 743 // Reschedule the cron job as a new event, if the cron entry signals 744 // a repeat then it will be stored separately and fired as a normal 745 // event with decrementing repeat. 746 doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); 747 748 if (repeat != 0) { 749 // we have a separate schedule to run at this time 750 // so the cron job is used to set of a separate schedule 751 // hence we won't fire the original cron job to the 752 // listeners but we do need to start a separate schedule 753 String jobId = ID_GENERATOR.generateId(); 754 ByteSequence payload = getPayload(job.getLocation()); 755 schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); 756 waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod(); 757 this.scheduleTime.setWaitTime(waitTime); 758 } 759 } else { 760 toRemove.add(job); 761 } 762 } 763 } 764 765 // now remove all jobs that have not been rescheduled from this execution 766 // time, if there are no more entries in that time it will be removed. 767 doRemove(executionTime, toRemove); 768 769 // If there is a job that should fire before the currently set wait time 770 // we need to reset wait time otherwise we'll miss it. 771 Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule(); 772 if (nextUp != null) { 773 final long timeUntilNextScheduled = nextUp.getKey() - currentTime; 774 if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) { 775 this.scheduleTime.setWaitTime(timeUntilNextScheduled); 776 } 777 } 778 } else { 779 this.scheduleTime.setWaitTime(executionTime - currentTime); 780 } 781 } 782 783 this.scheduleTime.pause(); 784 } catch (Exception ioe) { 785 LOG.error("{} Failed to schedule job", this.name, ioe); 786 try { 787 this.store.stop(); 788 } catch (Exception e) { 789 LOG.error("{} Failed to shutdown JobSchedulerStore", this.name, e); 790 } 791 } 792 } 793 } 794 795 void fireJob(JobLocation job) throws IllegalStateException, IOException { 796 LOG.debug("Firing: {}", job); 797 ByteSequence bs = this.store.getPayload(job.getLocation()); 798 for (JobListener l : jobListeners) { 799 l.scheduledJob(job.getJobId(), bs); 800 } 801 } 802 803 @Override 804 public void startDispatching() throws Exception { 805 if (!this.running.get()) { 806 return; 807 } 808 809 if (started.compareAndSet(false, true)) { 810 this.thread = new Thread(this, "JobScheduler:" + this.name); 811 this.thread.setDaemon(true); 812 this.thread.start(); 813 } 814 } 815 816 @Override 817 public void stopDispatching() throws Exception { 818 if (started.compareAndSet(true, false)) { 819 this.scheduleTime.wakeup(); 820 Thread t = this.thread; 821 this.thread = null; 822 if (t != null) { 823 t.join(3000); 824 } 825 } 826 } 827 828 @Override 829 protected void doStart() throws Exception { 830 this.running.set(true); 831 } 832 833 @Override 834 protected void doStop(ServiceStopper stopper) throws Exception { 835 this.running.set(false); 836 stopDispatching(); 837 } 838 839 private ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 840 return this.store.getPayload(location); 841 } 842 843 long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException { 844 long result = currentTime; 845 String cron = job.getCronEntry(); 846 if (cron != null && cron.length() > 0) { 847 result = CronParser.getNextScheduledTime(cron, result); 848 } else if (job.getRepeat() != 0) { 849 result += job.getPeriod(); 850 } 851 return result; 852 } 853 854 void createIndexes(Transaction tx) throws IOException { 855 this.index = new BTreeIndex<>(this.store.getPageFile(), tx.allocate().getPageId()); 856 } 857 858 void load(Transaction tx) throws IOException { 859 this.index.setKeyMarshaller(LongMarshaller.INSTANCE); 860 this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE); 861 this.index.load(tx); 862 } 863 864 void read(DataInput in) throws IOException { 865 this.name = in.readUTF(); 866 this.index = new BTreeIndex<>(this.store.getPageFile(), in.readLong()); 867 this.index.setKeyMarshaller(LongMarshaller.INSTANCE); 868 this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE); 869 } 870 871 public void write(DataOutput out) throws IOException { 872 out.writeUTF(name); 873 out.writeLong(this.index.getPageId()); 874 } 875 876 static class ScheduleTime { 877 private final int DEFAULT_WAIT = 500; 878 private final int DEFAULT_NEW_JOB_WAIT = 100; 879 private boolean newJob; 880 private long waitTime = DEFAULT_WAIT; 881 private final Object mutex = new Object(); 882 883 /** 884 * @return the waitTime 885 */ 886 long getWaitTime() { 887 return this.waitTime; 888 } 889 890 /** 891 * @param waitTime 892 * the waitTime to set 893 */ 894 void setWaitTime(long waitTime) { 895 if (!this.newJob) { 896 this.waitTime = waitTime > 0 ? waitTime : DEFAULT_WAIT; 897 } 898 } 899 900 void pause() { 901 synchronized (mutex) { 902 try { 903 mutex.wait(this.waitTime); 904 } catch (InterruptedException e) { 905 } 906 } 907 } 908 909 void newJob() { 910 this.newJob = true; 911 this.waitTime = DEFAULT_NEW_JOB_WAIT; 912 wakeup(); 913 } 914 915 void clearNewJob() { 916 this.newJob = false; 917 } 918 919 void wakeup() { 920 synchronized (this.mutex) { 921 mutex.notifyAll(); 922 } 923 } 924 } 925}