001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.scheduler; 018 019import java.io.IOException; 020import java.util.concurrent.atomic.AtomicBoolean; 021 022import org.apache.activemq.ScheduledMessage; 023import org.apache.activemq.advisory.AdvisorySupport; 024import org.apache.activemq.broker.Broker; 025import org.apache.activemq.broker.BrokerFilter; 026import org.apache.activemq.broker.BrokerService; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.ProducerBrokerExchange; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.command.MessageId; 032import org.apache.activemq.command.ProducerId; 033import org.apache.activemq.command.ProducerInfo; 034import org.apache.activemq.openwire.OpenWireFormat; 035import org.apache.activemq.security.SecurityContext; 036import org.apache.activemq.state.ProducerState; 037import org.apache.activemq.transaction.Synchronization; 038import org.apache.activemq.usage.JobSchedulerUsage; 039import org.apache.activemq.usage.SystemUsage; 040import org.apache.activemq.util.ByteSequence; 041import org.apache.activemq.util.IdGenerator; 042import org.apache.activemq.util.LongSequenceGenerator; 043import org.apache.activemq.util.TypeConversionSupport; 044import org.apache.activemq.wireformat.WireFormat; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048public class SchedulerBroker extends BrokerFilter implements JobListener { 049 private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class); 050 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 051 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 052 private final AtomicBoolean started = new AtomicBoolean(); 053 private final WireFormat wireFormat = new OpenWireFormat(); 054 private final ConnectionContext context = new ConnectionContext(); 055 private final ProducerId producerId = new ProducerId(); 056 private final SystemUsage systemUsage; 057 058 private final JobSchedulerStore store; 059 private JobScheduler scheduler; 060 061 public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception { 062 super(next); 063 064 this.store = store; 065 this.producerId.setConnectionId(ID_GENERATOR.generateId()); 066 this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 067 this.context.setBroker(next); 068 this.systemUsage = brokerService.getSystemUsage(); 069 070 wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); 071 } 072 073 public synchronized JobScheduler getJobScheduler() throws Exception { 074 return new JobSchedulerFacade(this); 075 } 076 077 @Override 078 public void start() throws Exception { 079 this.started.set(true); 080 getInternalScheduler(); 081 super.start(); 082 } 083 084 @Override 085 public void stop() throws Exception { 086 if (this.started.compareAndSet(true, false)) { 087 088 if (this.store != null) { 089 this.store.stop(); 090 } 091 if (this.scheduler != null) { 092 this.scheduler.removeListener(this); 093 this.scheduler = null; 094 } 095 } 096 super.stop(); 097 } 098 099 @Override 100 public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception { 101 ConnectionContext context = producerExchange.getConnectionContext(); 102 103 final String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID); 104 final Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); 105 final Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); 106 final Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); 107 108 String physicalName = messageSend.getDestination().getPhysicalName(); 109 boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0, 110 ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length()); 111 112 if (schedularManage == true) { 113 114 JobScheduler scheduler = getInternalScheduler(); 115 ActiveMQDestination replyTo = messageSend.getReplyTo(); 116 117 String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION); 118 119 if (action != null) { 120 121 Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME); 122 Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME); 123 124 if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) { 125 126 if (startTime != null && endTime != null) { 127 128 long start = (Long) TypeConversionSupport.convert(startTime, Long.class); 129 long finish = (Long) TypeConversionSupport.convert(endTime, Long.class); 130 131 for (Job job : scheduler.getAllJobs(start, finish)) { 132 sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo); 133 } 134 } else { 135 for (Job job : scheduler.getAllJobs()) { 136 sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo); 137 } 138 } 139 } 140 if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) { 141 scheduler.remove(jobId); 142 } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) { 143 144 if (startTime != null && endTime != null) { 145 146 long start = (Long) TypeConversionSupport.convert(startTime, Long.class); 147 long finish = (Long) TypeConversionSupport.convert(endTime, Long.class); 148 149 scheduler.removeAllJobs(start, finish); 150 } else { 151 scheduler.removeAllJobs(); 152 } 153 } 154 } 155 156 } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) { 157 158 // Check for room in the job scheduler store 159 if (systemUsage.getJobSchedulerUsage() != null) { 160 JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage(); 161 if (usage.isFull()) { 162 final String logMessage = "Job Scheduler Store is Full (" + 163 usage.getPercentUsage() + "% of " + usage.getLimit() + 164 "). Stopping producer (" + messageSend.getProducerId() + 165 ") to prevent flooding of the job scheduler store." + 166 " See http://activemq.apache.org/producer-flow-control.html for more info"; 167 168 long start = System.currentTimeMillis(); 169 long nextWarn = start; 170 while (!usage.waitForSpace(1000)) { 171 if (context.getStopping().get()) { 172 throw new IOException("Connection closed, send aborted."); 173 } 174 175 long now = System.currentTimeMillis(); 176 if (now >= nextWarn) { 177 LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)"); 178 nextWarn = now + 30000l; 179 } 180 } 181 } 182 } 183 184 if (context.isInTransaction()) { 185 context.getTransaction().addSynchronization(new Synchronization() { 186 @Override 187 public void afterCommit() throws Exception { 188 doSchedule(messageSend, cronValue, periodValue, delayValue); 189 } 190 }); 191 } else { 192 doSchedule(messageSend, cronValue, periodValue, delayValue); 193 } 194 } else { 195 super.send(producerExchange, messageSend); 196 } 197 } 198 199 private void doSchedule(Message messageSend, Object cronValue, Object periodValue, Object delayValue) throws Exception { 200 long delay = 0; 201 long period = 0; 202 int repeat = 0; 203 String cronEntry = ""; 204 205 // clear transaction context 206 Message msg = messageSend.copy(); 207 msg.setTransactionId(null); 208 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg); 209 if (cronValue != null) { 210 cronEntry = cronValue.toString(); 211 } 212 if (periodValue != null) { 213 period = (Long) TypeConversionSupport.convert(periodValue, Long.class); 214 } 215 if (delayValue != null) { 216 delay = (Long) TypeConversionSupport.convert(delayValue, Long.class); 217 } 218 Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); 219 if (repeatValue != null) { 220 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); 221 } 222 223 getInternalScheduler().schedule(msg.getMessageId().toString(), 224 new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat); 225 } 226 227 @Override 228 public void scheduledJob(String id, ByteSequence job) { 229 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength()); 230 try { 231 Message messageSend = (Message) wireFormat.unmarshal(packet); 232 messageSend.setOriginalTransactionId(null); 233 Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); 234 Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); 235 String cronStr = cronValue != null ? cronValue.toString() : null; 236 int repeat = 0; 237 if (repeatValue != null) { 238 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); 239 } 240 241 if (repeat != 0 || cronStr != null && cronStr.length() > 0) { 242 // create a unique id - the original message could be sent 243 // lots of times 244 messageSend.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId())); 245 } 246 247 // Add the jobId as a property 248 messageSend.setProperty("scheduledJobId", id); 249 250 // if this goes across a network - we don't want it rescheduled 251 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); 252 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); 253 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); 254 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); 255 256 if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) { 257 258 long oldExpiration = messageSend.getExpiration(); 259 long newTimeStamp = System.currentTimeMillis(); 260 long timeToLive = 0; 261 long oldTimestamp = messageSend.getTimestamp(); 262 263 if (oldExpiration > 0) { 264 timeToLive = oldExpiration - oldTimestamp; 265 } 266 267 long expiration = timeToLive + newTimeStamp; 268 269 if (expiration > oldExpiration) { 270 if (timeToLive > 0 && expiration > 0) { 271 messageSend.setExpiration(expiration); 272 } 273 messageSend.setTimestamp(newTimeStamp); 274 LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ messageSend.getMessageId(), oldTimestamp, newTimeStamp }); 275 } 276 } 277 278 // Repackage the message contents prior to send now that all updates are complete. 279 messageSend.beforeMarshall(wireFormat); 280 281 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 282 producerExchange.setConnectionContext(context); 283 producerExchange.setMutable(true); 284 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 285 super.send(producerExchange, messageSend); 286 } catch (Exception e) { 287 LOG.error("Failed to send scheduled message {}", id, e); 288 } 289 } 290 291 protected synchronized JobScheduler getInternalScheduler() throws Exception { 292 if (this.started.get()) { 293 if (this.scheduler == null && store != null) { 294 this.scheduler = store.getJobScheduler("JMS"); 295 this.scheduler.addListener(this); 296 this.scheduler.startDispatching(); 297 } 298 return this.scheduler; 299 } 300 return null; 301 } 302 303 protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception { 304 305 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload()); 306 try { 307 Message msg = (Message) this.wireFormat.unmarshal(packet); 308 msg.setOriginalTransactionId(null); 309 msg.setPersistent(false); 310 msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 311 msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId())); 312 msg.setDestination(replyTo); 313 msg.setResponseRequired(false); 314 msg.setProducerId(this.producerId); 315 316 // Add the jobId as a property 317 msg.setProperty("scheduledJobId", job.getJobId()); 318 319 final boolean originalFlowControl = context.isProducerFlowControl(); 320 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 321 producerExchange.setConnectionContext(context); 322 producerExchange.setMutable(true); 323 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 324 try { 325 context.setProducerFlowControl(false); 326 this.next.send(producerExchange, msg); 327 } finally { 328 context.setProducerFlowControl(originalFlowControl); 329 } 330 } catch (Exception e) { 331 LOG.error("Failed to send scheduled message {}", job.getJobId(), e); 332 } 333 } 334}