LIANHK
<?php
// 受到PHP性能限制(包括pthread自身限制以及代码加密,锁等待等问题)
// 经过测试,这个休眠时间几乎是必须的
// 要完美解决这个问题,需要使用golang
define ( 'USLEEP_TIME', 10000 );
function recv_data($socket, $length, $max_time) {
$time = microtime ( true );
$recv_length = 0;
$data = '';
$close_flag = false;
while ( microtime ( true ) - $time <= $max_time && $recv_length < $length ) {
if (! is_resource ( $socket ) || $close_flag) {
break;
}
$recv_length_this = socket_recv ( $socket, $temp_data, $length - $recv_length, MSG_DONTWAIT );
$recv_length += $recv_length_this;
$data .= $temp_data;
if ($recv_length_this === 0) {
$close_flag = true;
}
usleep ( USLEEP_TIME );
}
return array (
$recv_length,
$data
);
}
function data_get($sem, $shm, $index) {
$data_array = array ();
if (sem_acquire ( $sem, false )) {
$data_array = shm_get_var ( $shm, $index );
sem_release ( $sem );
}
return $data_array;
}
function data_set($sem, $shm, $index, $data) {
if (sem_acquire ( $sem, false )) {
$data_array = array ();
if (shm_has_var ( $shm, $index )) {
$data_array = shm_get_var ( $shm, $index );
}
shm_put_var ( $shm, $index, $data );
sem_release ( $sem );
}
return $data_array;
}
function data_merge($sem, $shm, $index, $array) {
if (sem_acquire ( $sem, false )) {
$data_array = shm_get_var ( $shm, $index );
$keys = array_keys ( $array );
$values = array_values ( $array );
for($i = 0; $i < count ( $keys ); $i ++) {
$data_array [$keys [$i]] = $values [$i];
}
shm_put_var ( $shm, $index, $data_array );
sem_release ( $sem );
}
return $data_array;
}
function data_append($sem, $shm, $index, $array) {
if (sem_acquire ( $sem, false )) {
$data_array = shm_get_var ( $shm, $index );
$data_array [] = $array;
shm_put_var ( $shm, $index, $data_array );
sem_release ( $sem );
}
return $data_array;
}
/**
* 保存线程共享数据
* $index
* 一般用数组(读线程写入,写线程读出,除非写线程有消息写入以通知给其他读/写线程对)
* 保存客户端连接信息(READ, WRITE,IP,PORT,START_TIME)
* $max_connection + $index
* 读写线程对标识
* READ: 0 未开启 1 正在运行 2 已退出
* WRITE: 0 未开启 1 正在运行 2 已退出
* 主线程运行标识
* 2 * $max_connection
* false 通知所有线程退出
*/
function connect($ip, $port, $read_class, $write_class, $max_connection = 20, $max_wait_time = 5) {
$directory = defined ( 'LOGGER' ) ? constant ( 'LOGGER' ) : '.';
if (! file_exists ( $directory )) {
mkdir ( $directory, 0777, true );
}
$log_prefix = defined ( 'LOG_PREFIX' ) ? constant ( 'LOG_PREFIX' ) : __FILE__;
file_put_contents ( $directory . $log_prefix . '.pid', getmypid () );
set_time_limit ( 0 );
ob_implicit_flush ();
$key = ftok ( tempnam ( time (), rand ( 0, 10000 ) ), 's' );
$sem = sem_get ( $key );
$shm = shm_attach ( $key, $max_connection * 1000000 );
for($i = 0; $i < $max_connection; $i ++) {
data_set ( $sem, $shm, $i, array () );
data_set ( $sem, $shm, $max_connection + $i, array (
'READ' => 0,
'WRITE' => 0
) );
}
data_set ( $sem, $shm, 2 * $max_connection, true );
$sockets = array ();
$reads = array ();
$writes = array ();
if ($ip === null) {
$socket = socket_create ( AF_INET, SOCK_STREAM, SOL_TCP );
socket_set_option ( $socket, SOL_SOCKET, SO_REUSEADDR, 1 );
socket_set_nonblock ( $socket );
$ip = '0.0.0.0';
if (! @socket_bind ( $socket, $ip, $port )) {
die ( 'can not bind "' . $ip . ':' . $port . '"' . PHP_EOL );
}
socket_listen ( $socket );
$array = array (
null,
$sem,
$shm,
null,
$max_connection,
$max_wait_time
);
while ( shm_get_var ( $array [2], 2 * $max_connection ) ) {
$s = socket_accept ( $socket );
$index = - 1;
for($i = 0; $i < $max_connection; $i ++) {
$data_array = shm_get_var ( $array [2], $max_connection + $i );
if (in_array ( $data_array ['READ'], array (
0,
3
) ) && in_array ( $data_array ['WRITE'], array (
0,
3
) )) {
if ($index == - 1) {
@socket_shutdown ( $sockets [$i], 2 );
@socket_close ( $sockets [$i] );
$index = $i;
break;
}
}
}
if (is_resource ( $s )) {
if ($index == - 1) {
@socket_shutdown ( $s, 2 );
socket_close ( $s );
} else {
socket_set_nonblock ( $s );
socket_getpeername ( $s, $address, $port );
$sockets [$index] = $s;
$array [0] = $index;
$array [3] = $s;
data_set ( $array [1], $array [2], $array [4] + $array [0], array (
'READ' => 1,
'WRITE' => 1,
'IP' => $address,
'PORT' => $port,
'START_TIME' => time ()
) );
$reads [$index] = new $read_class ( $array );
$writes [$index] = new $write_class ( $array );
}
}
usleep ( $max_connection * USLEEP_TIME );
}
} else {
$socket = socket_create ( AF_INET, SOCK_STREAM, SOL_TCP );
socket_set_nonblock ( $socket );
while ( ! @socket_connect ( $socket, $ip, $port ) ) {
sleep ( 1 );
}
data_set ( $sem, $shm, 1, array (
'READ' => 1,
'WRITE' => 1,
'IP' => $ip,
'PORT' => $port,
'START_TIME' => time ()
) );
$array = array (
0,
$sem,
$shm,
$socket,
1,
$max_wait_time
);
$read = new $read_class ( $array );
$write = new $write_class ( $array );
$read->join ();
$write->join ();
}
@socket_shutdown ( $socket, 2 );
socket_close ( $socket );
sem_remove ( $sem );
shm_remove ( $shm );
}