jmeter源码学习四(StandardJMeterEngine)

StandardJMeterEngine源码

JMeter 默认单机压测引擎,运行 JMeter 测试,直接用于本地 GUI 和非 GUI 调用,或者RemoteJMeterEngineImpl 在服务器模式下运行时启动。其逻辑关系如下:
avatar
简要解读:

  • HashTree是依赖的数据结构;
  • SearchByClass 用来查找 HashTree 中的所有节点,并把节点实例化为真正的对象,例如图中TestPlan/ThreadGroup/JavaSampler/ResultCollector 在 HashTree 中本来都是只是配置,全部通过 SearchByClass 实例化的;
  • 实例化出来的对象如果是 TestStateListener 类型,则会在有生命周期的函数回调,测试前调 testStarted,结束掉 testEnded, 比如 ResultCollector是该类型的一种,在结束的时候回调 testEnded 方法完成 report 的写入;
  • PreCompiler 用来解析 Arguments, 把 TestPlan 节点中配置的参数作为JMeterVariables 加入到测试线程上线文中;
  • ThreadGroup 用来用来管理一组线程,包括线程的个数/启动/关闭等;
  • StopTest 作为其内部类对外不可见,作为一个 Runnable,作用是异步停止测试,stopTest方法也是通过该内部类实现的。

JMeterEngine 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* This interface is implemented by classes that can run JMeter tests.
*/
public interface JMeterEngine {
/**
* Configure engine
* @param testPlan the test plan
*/
void configure(HashTree testPlan);

/**
* Runs the test
* @throws JMeterEngineException if an error occurs
*/
void runTest() throws JMeterEngineException;

/**
* Stop test immediately interrupting current samplers
*/
default void stopTest() {
stopTest(true);
}
/**
*
* @param now boolean that tell wether stop is immediate (interrupt) or not (wait for current sample end)
*/
void stopTest(boolean now);

/**
* Stop test if running
*/
void reset();

/**
* set Properties on engine
* @param p the properties to set
*/
void setProperties(Properties p);

/**
* Exit engine
*/
void exit();

/**
* @return boolean Flag to show whether engine is active (true when test is running). Set to false at end of test
*/
boolean isActive();
}

StandardJmeterEngine类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
public class StandardJMeterEngine implements JMeterEngine, Runnable {

private static final Logger log = LoggerFactory.getLogger(StandardJMeterEngine.class);

// Should we exit at end of the test? (only applies to server, because host is non-null)
private static final boolean EXIT_AFTER_TEST =
JMeterUtils.getPropDefault("server.exitaftertest", false); // $NON-NLS-1$

// Allow engine and threads to be stopped from outside a thread
// e.g. from beanshell server
// Assumes that there is only one instance of the engine
// at any one time so it is not guaranteed to work ...
// 灵魂级变量,注意关键字volatile
private static volatile StandardJMeterEngine engine;

/*
* Allow functions etc to register for testStopped notification.
* Only used by the function parser so far.
* The list is merged with the testListeners and then cleared.
*/
private static final List<TestStateListener> testList = new ArrayList<>();

/** Whether to call System.exit(0) in exit after stopping RMI */
private static final boolean REMOTE_SYSTEM_EXIT = JMeterUtils.getPropDefault("jmeterengine.remote.system.exit", false);

/** Whether to call System.exit(1) if threads won't stop */
private static final boolean SYSTEM_EXIT_ON_STOP_FAIL = JMeterUtils.getPropDefault("jmeterengine.stopfail.system.exit", true);

/** Whether to call System.exit(0) unconditionally at end of non-GUI test */
private static final boolean SYSTEM_EXIT_FORCED = JMeterUtils.getPropDefault("jmeterengine.force.system.exit", false);

/** Flag to show whether test is running. Set to false to stop creating more threads. */
private volatile boolean running = false;

/** Flag to show whether engine is active. Set to false at end of test. */
private volatile boolean active = false;

/** Thread Groups run sequentially */
private volatile boolean serialized = false;

/** tearDown Thread Groups run after shutdown of main threads */
private volatile boolean tearDownOnShutdown = false;

private HashTree test;

private final String host;

// The list of current thread groups; may be setUp, main, or tearDown.
private final List<AbstractThreadGroup> groups = new CopyOnWriteArrayList<>();

public StandardJMeterEngine() {
this(null);
}

public StandardJMeterEngine(String host) {
this.host = host;
// Hack to allow external control
initSingletonEngine(this);
}
/**
* Set the shared engine
* @param standardJMeterEngine
*/
private static void initSingletonEngine(StandardJMeterEngine standardJMeterEngine) {
StandardJMeterEngine.engine = standardJMeterEngine;
}

/**
* set the shared engine to null
*/
private static void resetSingletonEngine() {
StandardJMeterEngine.engine = null;
}

public static void stopEngineNow() {
if (engine != null) {// May be null if called from Unit test
engine.stopTest(true);
}
}

public static void stopEngine() {
if (engine != null) { // May be null if called from Unit test
engine.stopTest(false);
}
}

public static synchronized void register(TestStateListener tl) {
testList.add(tl);
}

public static boolean stopThread(String threadName) {
return stopThread(threadName, false);
}

public static boolean stopThreadNow(String threadName) {
return stopThread(threadName, true);
}

private static boolean stopThread(String threadName, boolean now) {
if (engine == null) {
return false;// e.g. not yet started
}
boolean wasStopped = false;
// ConcurrentHashMap does not need synch. here
for (AbstractThreadGroup threadGroup : engine.groups) {
wasStopped = wasStopped || threadGroup.stopThread(threadName, now);
}
return wasStopped;
}

// End of code to allow engine to be controlled remotely

@Override
public void configure(HashTree testTree) {
// Is testplan serialised?
SearchByClass<TestPlan> testPlan = new SearchByClass<>(TestPlan.class);
testTree.traverse(testPlan);
Object[] plan = testPlan.getSearchResults().toArray();
if (plan.length == 0) {
throw new RuntimeException("Could not find the TestPlan class!");
}
TestPlan tp = (TestPlan) plan[0];
serialized = tp.isSerialized();
tearDownOnShutdown = tp.isTearDownOnShutdown();
active = true;
test = testTree;
}

@Override
public void runTest() throws JMeterEngineException {
if (host != null){
long now=System.currentTimeMillis();
System.out.println("Starting the test on host " + host + " @ "+new Date(now)+" ("+now+")"); // NOSONAR Intentional
}
try {
Thread runningThread = new Thread(this, "StandardJMeterEngine");
runningThread.start();
} catch (Exception err) {
stopTest();
throw new JMeterEngineException(err);
}
}

private void removeThreadGroups(List<?> elements) {
Iterator<?> iter = elements.iterator();
while (iter.hasNext()) { // Can't use for loop here because we remove elements
Object item = iter.next();
if (item instanceof AbstractThreadGroup) {
iter.remove();
} else if (!(item instanceof TestElement)) {
iter.remove();
}
}
}

private void notifyTestListenersOfStart(SearchByClass<TestStateListener> testListeners) {
for (TestStateListener tl : testListeners.getSearchResults()) {
if (tl instanceof TestBean) {
TestBeanHelper.prepare((TestElement) tl);
}
if (host == null) {
tl.testStarted();
} else {
tl.testStarted(host);
}
}
}

private void notifyTestListenersOfEnd(SearchByClass<TestStateListener> testListeners) {
log.info("Notifying test listeners of end of test");
for (TestStateListener tl : testListeners.getSearchResults()) {
try {
if (host == null) {
tl.testEnded();
} else {
tl.testEnded(host);
}
} catch (Exception e) {
log.warn("Error encountered during shutdown of "+tl.toString(),e);
}
}
if (host != null) {
log.info("Test has ended on host {} ", host);
long now=System.currentTimeMillis();
System.out.println("Finished the test on host " + host + " @ "+new Date(now)+" ("+now+")" // NOSONAR Intentional
+(EXIT_AFTER_TEST ? " - exit requested." : ""));
if (EXIT_AFTER_TEST){
exit();
}
}
active=false;
}

@Override
public void reset() {
if (running) {
stopTest();
}
}

/**
* Stop Test Now
*/
@Override
public synchronized void stopTest() {
stopTest(true);
}

@Override
public synchronized void stopTest(boolean now) {
Thread stopThread = new Thread(new StopTest(now));
stopThread.start();
}

private class StopTest implements Runnable {
private final boolean now;

private StopTest(boolean b) {
now = b;
}

/**
* For each current thread group, invoke:
* <ul>
* <li>{@link AbstractThreadGroup#stop()} - set stop flag</li>
* </ul>
*/
private void stopAllThreadGroups() {
// ConcurrentHashMap does not need synch. here
for (AbstractThreadGroup threadGroup : groups) {
threadGroup.stop();
}
}

/**
* For each thread group, invoke {@link AbstractThreadGroup#tellThreadsToStop()}
*/
private void tellThreadGroupsToStop() {
// ConcurrentHashMap does not need protecting
for (AbstractThreadGroup threadGroup : groups) {
threadGroup.tellThreadsToStop();
}
}

/**
* @return boolean true if all threads of all Thread Groups stopped
*/
private boolean verifyThreadsStopped() {
boolean stoppedAll = true;
// ConcurrentHashMap does not need synch. here
for (AbstractThreadGroup threadGroup : groups) {
stoppedAll = stoppedAll && threadGroup.verifyThreadsStopped();
}
return stoppedAll;
}

/**
* @return total of active threads in all Thread Groups
*/
private int countStillActiveThreads() {
int reminingThreads= 0;
for (AbstractThreadGroup threadGroup : groups) {
reminingThreads += threadGroup.numberOfActiveThreads();
}
return reminingThreads;
}

@Override
public void run() {
running = false;
resetSingletonEngine();
if (now) {
tellThreadGroupsToStop();
pause(10L * countStillActiveThreads());
boolean stopped = verifyThreadsStopped();
if (!stopped) { // we totally failed to stop the test
if (JMeter.isNonGUI()) {
// TODO should we call test listeners? That might hang too ...
log.error(JMeterUtils.getResString("stopping_test_failed")); //$NON-NLS-1$
if (SYSTEM_EXIT_ON_STOP_FAIL) { // default is true
log.error("Exiting");
System.out.println("Fatal error, could not stop test, exiting"); // NOSONAR Intentional
System.exit(1); // NOSONAR Intentional
} else {
System.out.println("Fatal error, could not stop test"); // NOSONAR Intentional
}
} else {
JMeterUtils.reportErrorToUser(
JMeterUtils.getResString("stopping_test_failed"), //$NON-NLS-1$
JMeterUtils.getResString("stopping_test_title")); //$NON-NLS-1$
}
} // else will be done by threadFinished()
} else {
stopAllThreadGroups();
}
}
}

@Override
public void run() {
log.info("Running the test!");
running = true;

/*
* Ensure that the sample variables are correctly initialised for each run.
*/
SampleEvent.initSampleVariables();

JMeterContextService.startTest();
try {
PreCompiler compiler = new PreCompiler();
test.traverse(compiler);
} catch (RuntimeException e) {
log.error("Error occurred compiling the tree:",e);
JMeterUtils.reportErrorToUser("Error occurred compiling the tree: - see log file", e);
return; // no point continuing
}
/**
* Notification of test listeners needs to happen after function
* replacement, but before setting RunningVersion to true.
*/
SearchByClass<TestStateListener> testListeners = new SearchByClass<>(TestStateListener.class); // TL - S&E
test.traverse(testListeners);

// Merge in any additional test listeners
// currently only used by the function parser
testListeners.getSearchResults().addAll(testList);
testList.clear(); // no longer needed

test.traverse(new TurnElementsOn());
notifyTestListenersOfStart(testListeners);

List<?> testLevelElements = new LinkedList<>(test.list(test.getArray()[0]));
removeThreadGroups(testLevelElements);

SearchByClass<SetupThreadGroup> setupSearcher = new SearchByClass<>(SetupThreadGroup.class);
SearchByClass<AbstractThreadGroup> searcher = new SearchByClass<>(AbstractThreadGroup.class);
SearchByClass<PostThreadGroup> postSearcher = new SearchByClass<>(PostThreadGroup.class);

test.traverse(setupSearcher);
test.traverse(searcher);
test.traverse(postSearcher);

TestCompiler.initialize();
// for each thread group, generate threads
// hand each thread the sampler controller
// and the listeners, and the timer
Iterator<SetupThreadGroup> setupIter = setupSearcher.getSearchResults().iterator();
Iterator<AbstractThreadGroup> iter = searcher.getSearchResults().iterator();
Iterator<PostThreadGroup> postIter = postSearcher.getSearchResults().iterator();

ListenerNotifier notifier = new ListenerNotifier();

int groupCount = 0;
JMeterContextService.clearTotalThreads();

if (setupIter.hasNext()) {
log.info("Starting setUp thread groups");
while (running && setupIter.hasNext()) {//for each setup thread group
AbstractThreadGroup group = setupIter.next();
groupCount++;
String groupName = group.getName();
log.info("Starting setUp ThreadGroup: {} : {} ", groupCount, groupName);
startThreadGroup(group, groupCount, setupSearcher, testLevelElements, notifier);
if (serialized && setupIter.hasNext()) {
log.info("Waiting for setup thread group: {} to finish before starting next setup group",
groupName);
group.waitThreadsStopped();
}
}
log.info("Waiting for all setup thread groups to exit");
//wait for all Setup Threads To Exit
waitThreadsStopped();
log.info("All Setup Threads have ended");
groupCount=0;
JMeterContextService.clearTotalThreads();
}

groups.clear(); // The groups have all completed now

/*
* Here's where the test really starts. Run a Full GC now: it's no harm
* at all (just delays test start by a tiny amount) and hitting one too
* early in the test can impair results for short tests.
*/
JMeterUtils.helpGC();

JMeterContextService.getContext().setSamplingStarted(true);
boolean mainGroups = running; // still running at this point, i.e. setUp was not cancelled
while (running && iter.hasNext()) {// for each thread group
AbstractThreadGroup group = iter.next();
//ignore Setup and Post here. We could have filtered the searcher. but then
//future Thread Group objects wouldn't execute.
if (group instanceof SetupThreadGroup ||
group instanceof PostThreadGroup) {
continue;
}
groupCount++;
String groupName = group.getName();
log.info("Starting ThreadGroup: {} : {}", groupCount, groupName);
startThreadGroup(group, groupCount, searcher, testLevelElements, notifier);
if (serialized && iter.hasNext()) {
log.info("Waiting for thread group: {} to finish before starting next group", groupName);
group.waitThreadsStopped();
}
} // end of thread groups
if (groupCount == 0){ // No TGs found
log.info("No enabled thread groups found");
} else {
if (running) {
log.info("All thread groups have been started");
} else {
log.info("Test stopped - no more thread groups will be started");
}
}

//wait for all Test Threads To Exit
waitThreadsStopped();
groups.clear(); // The groups have all completed now

if (postIter.hasNext()){
groupCount = 0;
JMeterContextService.clearTotalThreads();
log.info("Starting tearDown thread groups");
if (mainGroups && !running) { // i.e. shutdown/stopped during main thread groups
running = tearDownOnShutdown; // re-enable for tearDown if necessary
}
while (running && postIter.hasNext()) {//for each setup thread group
AbstractThreadGroup group = postIter.next();
groupCount++;
String groupName = group.getName();
log.info("Starting tearDown ThreadGroup: {} : {}", groupCount, groupName);
startThreadGroup(group, groupCount, postSearcher, testLevelElements, notifier);
if (serialized && postIter.hasNext()) {
log.info("Waiting for post thread group: {} to finish before starting next post group", groupName);
group.waitThreadsStopped();
}
}
waitThreadsStopped(); // wait for Post threads to stop
}

notifyTestListenersOfEnd(testListeners);
JMeterContextService.endTest();
if (JMeter.isNonGUI() && SYSTEM_EXIT_FORCED) {
log.info("Forced JVM shutdown requested at end of test");
System.exit(0); // NOSONAR Intentional
}
}

private void startThreadGroup(AbstractThreadGroup group, int groupCount, SearchByClass<?> searcher, List<?> testLevelElements, ListenerNotifier notifier)
{
try {
int numThreads = group.getNumThreads();
JMeterContextService.addTotalThreads(numThreads);
boolean onErrorStopTest = group.getOnErrorStopTest();
boolean onErrorStopTestNow = group.getOnErrorStopTestNow();
boolean onErrorStopThread = group.getOnErrorStopThread();
boolean onErrorStartNextLoop = group.getOnErrorStartNextLoop();
String groupName = group.getName();
log.info("Starting {} threads for group {}.", numThreads, groupName);
if (onErrorStopTest) {
log.info("Test will stop on error");
} else if (onErrorStopTestNow) {
log.info("Test will stop abruptly on error");
} else if (onErrorStopThread) {
log.info("Thread will stop on error");
} else if (onErrorStartNextLoop) {
log.info("Thread will start next loop on error");
} else {
log.info("Thread will continue on error");
}
ListedHashTree threadGroupTree = (ListedHashTree) searcher.getSubTree(group);
threadGroupTree.add(group, testLevelElements);

groups.add(group);
group.start(groupCount, notifier, threadGroupTree, this);
} catch (JMeterStopTestException ex) { // NOSONAR Reported by log
JMeterUtils.reportErrorToUser("Error occurred starting thread group :" + group.getName()+ ", error message:"+ex.getMessage()
+", \r\nsee log file for more details", ex);
return; // no point continuing
}
}

/**
* Wait for Group Threads to stop
*/
private void waitThreadsStopped() {
// ConcurrentHashMap does not need synch. here
for (AbstractThreadGroup threadGroup : groups) {
threadGroup.waitThreadsStopped();
}
}

/**
* Clean shutdown ie, wait for end of current running samplers
*/
public void askThreadsToStop() {
if (engine != null) { // Will be null if StopTest thread has started
engine.stopTest(false);
}
}

/**
* Remote exit
* Called by RemoteJMeterEngineImpl.rexit()
* and by notifyTestListenersOfEnd() iff exitAfterTest is true;
* in turn that is called by the run() method and the StopTest class
* also called
*/
@Override
public void exit() {
ClientJMeterEngine.tidyRMI(log); // This should be enough to allow server to exit.
if (REMOTE_SYSTEM_EXIT) { // default is false
log.warn("About to run System.exit(0) on {}", host);
// Needs to be run in a separate thread to allow RMI call to return OK
Thread t = new Thread() {
@Override
public void run() {
pause(1000); // Allow RMI to complete
log.info("Bye from {}", host);
System.out.println("Bye from "+host); // NOSONAR Intentional
System.exit(0); // NOSONAR Intentional
}
};
t.start();
}
}

private void pause(long ms){
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
public void setProperties(Properties p) {
log.info("Applying properties {}", p);
JMeterUtils.getJMeterProperties().putAll(p);
}

@Override
public boolean isActive() {
return active;
}
}

RemoteJMeterEngine 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface RemoteJMeterEngine extends Remote {
void rconfigure(HashTree testTree, String host, File jmxBase, String scriptName) throws RemoteException;

void rrunTest() throws RemoteException, JMeterEngineException;

void rstopTest(boolean now) throws RemoteException;

void rreset() throws RemoteException;

void rsetProperties(Properties p) throws RemoteException;

void rexit() throws RemoteException;
}