refactor: move Connection to use ConnectionTransport (#3217)

Currently connection assumes that transport is a websocket
and tries to handle websocket-related errors.

This patch:
- moves ConnectionTransport interface to use callbacks instead
  of events. This way it could be used in browser context as well.
- introduces WebSocketTransport that implements ConnectionTransport
  interface for ws.

This is a preparation step for 2 things:
- exposing `transport` option in the `puppeteer.connect` method
- better support for `browserify`

References #2119
This commit is contained in:
Andrey Lushnikov 2018-09-07 21:36:16 +01:00 committed by GitHub
parent c967aebc84
commit 1b2c8113ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 93 additions and 42 deletions

View File

@ -16,35 +16,9 @@
const {helper, assert} = require('./helper');
const debugProtocol = require('debug')('puppeteer:protocol');
const debugSession = require('debug')('puppeteer:session');
const EventEmitter = require('events');
const WebSocket = require('ws');
const Pipe = require('./Pipe');
class Connection extends EventEmitter {
/**
* @param {string} url
* @param {number=} delay
* @return {!Promise<!Connection>}
*/
static async createForWebSocket(url, delay = 0) {
return new Promise((resolve, reject) => {
const ws = new WebSocket(url, { perMessageDeflate: false });
ws.on('open', () => resolve(new Connection(url, ws, delay)));
ws.on('error', reject);
});
}
/**
* @param {!NodeJS.WritableStream} pipeWrite
* @param {!NodeJS.ReadableStream} pipeRead
* @param {number=} delay
* @return {!Connection}
*/
static createForPipe(pipeWrite, pipeRead, delay = 0) {
return new Connection('', new Pipe(pipeWrite, pipeRead), delay);
}
/**
* @param {string} url
* @param {!Puppeteer.ConnectionTransport} transport
@ -59,8 +33,8 @@ class Connection extends EventEmitter {
this._delay = delay;
this._transport = transport;
this._transport.on('message', this._onMessage.bind(this));
this._transport.on('close', this._onClose.bind(this));
this._transport.onmessage = this._onMessage.bind(this);
this._transport.onclose = this._onClose.bind(this);
/** @type {!Map<string, !CDPSession>}*/
this._sessions = new Map();
}
@ -133,9 +107,8 @@ class Connection extends EventEmitter {
this._closeCallback();
this._closeCallback = null;
}
this._transport.removeAllListeners();
// If transport throws any error at this point of time, we don't care and should swallow it.
this._transport.on('error', () => {});
this._transport.onmessage = null;
this._transport.onclose = null;
for (const callback of this._callbacks.values())
callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): Target closed.`));
this._callbacks.clear();

View File

@ -24,6 +24,8 @@ const readline = require('readline');
const fs = require('fs');
const {helper, debugError} = require('./helper');
const {TimeoutError} = require('./Errors');
const WebSocketTransport = require('./WebSocketTransport');
const PipeTransport = require('./PipeTransport');
const mkdtempAsync = helper.promisify(fs.mkdtemp);
const removeFolderAsync = helper.promisify(removeFolder);
@ -158,9 +160,11 @@ class Launcher {
try {
if (!usePipe) {
const browserWSEndpoint = await waitForWSEndpoint(chromeProcess, timeout, this._preferredRevision);
connection = await Connection.createForWebSocket(browserWSEndpoint, slowMo);
const transport = await WebSocketTransport.create(browserWSEndpoint);
connection = new Connection(browserWSEndpoint, transport, slowMo);
} else {
connection = Connection.createForPipe(/** @type {!NodeJS.WritableStream} */(chromeProcess.stdio[3]), /** @type {!NodeJS.ReadableStream} */ (chromeProcess.stdio[4]), slowMo);
const transport = new PipeTransport(/** @type {!NodeJS.WritableStream} */(chromeProcess.stdio[3]), /** @type {!NodeJS.ReadableStream} */ (chromeProcess.stdio[4]));
connection = new Connection('', transport, slowMo);
}
const browser = await Browser.create(connection, [], ignoreHTTPSErrors, defaultViewport, chromeProcess, gracefullyCloseChrome);
await ensureInitialPage(browser);
@ -276,7 +280,8 @@ class Launcher {
defaultViewport = {width: 800, height: 600},
slowMo = 0,
} = options;
const connection = await Connection.createForWebSocket(browserWSEndpoint, slowMo);
const transport = await WebSocketTransport.create(browserWSEndpoint);
const connection = new Connection(browserWSEndpoint, transport, slowMo);
const {browserContextIds} = await connection.send('Target.getBrowserContexts');
return Browser.create(connection, browserContextIds, ignoreHTTPSErrors, defaultViewport, null, () => connection.send('Browser.close').catch(debugError));
}

View File

@ -14,20 +14,23 @@
* limitations under the License.
*/
const {helper} = require('./helper');
const EventEmitter = require('events');
class Pipe extends EventEmitter {
/**
* @implements {!Puppeteer.ConnectionTransport}
*/
class PipeTransport {
/**
* @param {!NodeJS.WritableStream} pipeWrite
* @param {!NodeJS.ReadableStream} pipeRead
*/
constructor(pipeWrite, pipeRead) {
super();
this._pipeWrite = pipeWrite;
this._pendingMessage = '';
this._eventListeners = [
helper.addEventListener(pipeRead, 'data', buffer => this._dispatch(buffer))
];
this.onmessage = null;
this.onclose = null;
}
/**
@ -48,12 +51,14 @@ class Pipe extends EventEmitter {
return;
}
const message = this._pendingMessage + buffer.toString(undefined, 0, end);
this.emit('message', message);
if (this.onmessage)
this.onmessage.call(null, message);
let start = end + 1;
end = buffer.indexOf('\0', start);
while (end !== -1) {
this.emit('message', buffer.toString(undefined, start, end));
if (this.onmessage)
this.onmessage.call(null, buffer.toString(undefined, start, end));
start = end + 1;
end = buffer.indexOf('\0', start);
}
@ -66,4 +71,4 @@ class Pipe extends EventEmitter {
}
}
module.exports = Pipe;
module.exports = PipeTransport;

65
lib/WebSocketTransport.js Normal file
View File

@ -0,0 +1,65 @@
/**
* Copyright 2018 Google Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const WebSocket = require('ws');
/**
* @implements {!Puppeteer.ConnectionTransport}
*/
class WebSocketTransport {
/**
* @param {string} url
* @return {!Promise<!WebSocketTransport>}
*/
static create(url) {
return new Promise((resolve, reject) => {
const ws = new WebSocket(url, { perMessageDeflate: false });
ws.on('open', () => resolve(new WebSocketTransport(ws)));
ws.on('error', reject);
});
}
/**
* @param {!WebSocket} ws
*/
constructor(ws) {
this._ws = ws;
this._ws.on('message', event => {
if (this.onmessage)
this.onmessage.call(null, event);
});
this._ws.on('close', event => {
if (this.onclose)
this.onclose.call(null);
});
// Silently ignore all errors - we don't know what to do with them.
this._ws.on('error', () => {});
this.onmessage = null;
this.onclose = null;
}
/**
* @param {string} message
*/
send(message) {
this._ws.send(message);
}
close() {
this._ws.close();
}
}
module.exports = WebSocketTransport;

4
lib/externs.d.ts vendored
View File

@ -32,9 +32,11 @@ declare global {
export class Response extends RealResponse { }
export class Request extends RealRequest { }
export interface ConnectionTransport extends NodeJS.EventEmitter {
export interface ConnectionTransport {
send(string);
close();
onmessage?: (message: string) => void,
onclose?: () => void,
}
export interface ChildProcess extends child_process.ChildProcess { }

View File

@ -31,9 +31,10 @@ const EXCLUDE_CLASSES = new Set([
'Multimap',
'NavigatorWatcher',
'NetworkManager',
'Pipe',
'PipeTransport',
'TaskQueue',
'WaitTask',
'WebSocketTransport',
]);
const EXCLUDE_PROPERTIES = new Set([