PHPIndex

This page lists files in the current directory. You can view content, get download/execute commands for Wget, Curl, or PowerShell, or filter the list using wildcards (e.g., `*.sh`).

i18n
lib
static
app.php
wget 'https://sme10.lists2.roe3.org/kodbox/plugins/storeImport/app.php'
View Content
<?php

/**
 * 存储导入:已存文件数据扫描写入数据库
 * OSS:sqlite+file:100w文件耗时6.5-7小时;mysql+redis:100w文件耗时3.5-4小时
 */

// TODO 待更新为列表分段写入缓存
class storeImportPlugin extends PluginBase{
	function __construct(){
		parent::__construct();
	}
	public function regist(){
		$this->hookRegist(array(
			'user.commonJs.insert'			=> 'storeImportPlugin.echoJs',
			'admin.storage.import.before'	=> 'storeImportPlugin.cliImport',
		));
	}
	public function echoJs(){
		$this->echoFile('static/main.js');
	}

	public function api($path){
		$parse	= KodIO::parse($path);
		$store = Model('Storage')->driverInfo($parse['id']);
		include_once($this->pluginPath.'lib/Driver.class.php');
		return new impDriver($store);
	}

	// 数据导入
	public function start(){
		KodUser::checkRoot();
		$pathFrom = Input::get('pathFrom','require');
		$pathTo	  = Input::get('pathTo','require');
		$pathFrom = trim($pathFrom, '/');
        $pathTo	  = trim($pathTo, '/');
		$taskId	  = $this->_taskId();

		// 0.任务进度
		$this->getProcess($pathFrom, $pathTo);

		// 1.1 检查原始目录
		$parse = KodIO::parse($pathFrom);
		if ($parse['type'] != KodIO::KOD_IO) {
			$this->showJson(LNG('storeImport.main.ioPathErr'), false);
		}
		$store = Model('Storage')->driverInfo($parse['id']);
		if (!$store) {
			$this->showJson(LNG('storeImport.main.ioStoreErr'), false);
		}
		$type = strtolower($store['driver']);
		$ioList = array(
			'sg' => array('local','oss','qiniu','uss'),	// ftp性能较差,暂不考虑支持
			's3' => array('s3','bos','cos','eds','eos','jos','minio','obs','oos','moss','nos')
		);
		if (!in_array($type, $ioList['sg']) && !in_array($type, $ioList['s3'])) {
			$this->showJson(LNG('storeImport.main.ioNotSupErr').$store['driver'], false);
		}
		if (is_array($store['config'])) {	// 兼容旧版
			$store['config'] = json_encode($store['config']);
		}
		$check = Model('Storage')->checkConfig($store, true);
		if ($check !== true) {
			$this->showJson(LNG('storeImport.main.ioFromNetErr').$check, false);
		}
		if ($type != 'local') $this->in['hash'] = 0;

		// 1.2 检查目标目录
		$parse = KodIO::parse($pathTo);
		if ($parse['type'] != KodIO::KOD_SOURCE) {
			$this->showJson(LNG('storeImport.main.ioToErr'), false);
		}

		// 2. 开始导入
		$this->doImport($pathFrom, $pathTo);

		$data = Cache::get($taskId);
		$this->showJson($data, true, 1);
	}

	// 导入进度
	private function getProcess($pathFrom, $pathTo){
		$taskId = $this->_taskId();
		$task = Task::get($taskId);	// TODO task存在时为false,不存在为null,待确认
		if (!isset($this->in['process'])) {
			if ($task) $this->showJson(LNG('storeImport.task.rptErr'), false);
			Cache::remove($taskId);
			return;
		}
		if ($this->in['kill']) {
			if (!$task) $task = array('taskPercent' => 1);
			$task['desc'] = LNG('storeImport.task.stopByUser');
			$task['status'] = 'kill';
			Cache::set($taskId, $task);	// 避免process继续请求
			Task::kill($taskId);
			$this->showJson('Task killed.');
		}
		$info  = 0;
		$cache = Cache::get($taskId);
		if ($cache) {
			$task = $cache; $info = 1;
			Cache::remove($taskId);
		}
		$this->showJson($task, true, $info);
	}

	/**
	 * 存储导入
	 * @param [type] $pathFrom	{io:2}/oldpath	{io:2}=>/var/usr/data
	 * @param [type] $pathTo	{source:1}
	 * @return void
	 */
	public function doImport($pathFrom, $pathTo){
		ignore_timeout();
		Hook::bind('show_json',array($this,'showErrorCheck'));
		$parseFrom = KodIO::parse($pathFrom);
		$parseTo = KodIO::parse($pathTo);
		$this->ioType = $parseFrom['id'];	// 原存储io(id)
		// $checkHash = intval($this->in['hash']);

		// 0. 开始输出提示
		$text = LNG('storeImport.main.start').'from=>'.$pathFrom.'; to=>'.$pathTo;
		$this->webEcho($text);
		$this->cliEcho($pathFrom, $text, 0);

		// 1. 开始任务
		$taskId = $this->_taskId();
		$task	= new Task($taskId, 'storeImport', 0, LNG('storeImport.main.dataImport'));
		$info = $this->pathTotalInfo($pathFrom);
		$task->task['taskTotal'] = $info['total'];

		// 1.1 初始化:io_file缓存、日志文件
		$this->ioFileCache($pathFrom, 'init');	// 原目录io_file记录——缓存初始化
		$this->wrt256Log($pathFrom, $pathTo, 0);	// 路径长度超出,记录日志

		// 1.2 获取原目录文件列表
		$GLOBALS['STORE_IMPORT_FILE_CNT'] = 0;	// 指定目录下文件总数——动态变化;使用缓存的问题:数据落地不及时、保持key一致比较麻烦
		// 原存储driver ——$this->api()获取的driver无法随意调用父类方法,原因未知
		$driver = IO::init($pathFrom);
		$list = $this->api($pathFrom)->listAll($pathFrom);

		// TODO 中途中断不退出
		$GLOBALS['SHOW_JSON_NOT_EXIT'] = 1;

		// 2. 开始导入
		$fid  = 0;
		$this->pathIdx = $idx = 0;
		$rest = array('success'=>0, 'error'=>0);
		$sModel = Model('Source'); $fModel = Model('File');
        foreach ($list as $item) {
			$name  = get_path_this($item['path']);
			// $task->task['taskTotal'] = $GLOBALS['STORE_IMPORT_FILE_CNT'];
			$task->task['currentTitle'] = $name;
			$task->update(1);
			$this->pathIdx++;
			$this->cliEcho($pathFrom);

			// 每10000条刷新一下大小
			$idx++;
			if ($idx > 10000) {
				$idx = 0;
				$sModel->folderSizeResetChildren($parseTo['id']);
			}
			// TODO 可以考虑按完整层级存文件夹id,在此目录下新增文件/夹
			// $ccKey = $taskId.'_'.md5($item['path']);
			// if (Cache::get($ccKey)) continue;

			// 获取io_source.path、io_file.path
			$path  = $item['path'];						// 原数据绝对路径:/var/usr/data/oldpath/home/1.txt
            $fPath = $driver->getPathOuter($path);		// io_file.path:{io:2}/oldpath/home/1.txt
            $tPath = substr($fPath, strlen($pathFrom));	// 临时路径:/home/1.txt
            $sPath = $pathTo.'/'.trim($tPath,'/');		// io_source.path:{source:1}/home/1.txt;对目标目录而言,一级子目录为原数据指定目录下的一级子目录,即home

            // 1. 文件夹创建
            if ($item['folder'] == 1) {
                $res = IO::mkdir($sPath);
				$this->writeLog($res, $path, $rest, 'folder');
				// if ($res) Cache::set($ccKey, 1);
                continue;
            }

            // 2. 文件创建——添加io_source/io_file记录
			// 2.0 path过长,拦截
			if (mb_strlen($fPath) > 256) {
				$this->wrt256Log($pathFrom, $pathTo, $fPath);
				$this->writeLog(false, 'path length > 256.', $rest);
				// Cache::set($ccKey, 1);
				continue;
			}
            // 2.1 IO::mkfile($path)创建文件,得到sourceID——多一条io_file空文件记录
			// 判断io_file是否存在,不存在,执行完整流程;存在,检查io_source——存在且fileID相等,跳过;不存在,新建source记录并增加file引用数
			$find = $this->ioFileCache($fPath);	// [fileID=xx,linkCount=>1]
			if ($find) {
				$info = IO::infoFullSimple($sPath);
				if ($info && $info['fileID'] == $find['fileID']) {
					$this->writeLog(true, $path, $rest);
					// if ($res) Cache::set($ccKey, 1);
					continue;
				}
			}
			// 新建io_source记录
			$res = $sPath = IO::mkfile($sPath);
			if (!$this->writeLog($res, $path, $rest)) continue;
			if (!$fid) {	// 获取空文件fileID ——IO::mkfile持续引用此文件,完成后需重置引用数
				$info = IO::infoSimple($res);
				$fid  = _get($info, 'fileID', 0);
			}
			$parse = KodIO::parse($res);
            $sourceID = $parse['id'];

            // 2.2 io_file新增/更新,获得fileID
			$size = $item['size'];
			if ($find) {
				$data = array(
                    "size"      => $size,
                    "linkCount"	=> intval($find['linkCount']) + 1,
                );
                $res = $fModel->where(array("fileID"=>$find['fileID']) )->save($data);
                $fileID = $res ? $find['fileID'] : 0;
			} else {
				$data = array(
					"size" 		=> $size,
					"linkCount"	=> 1,
					"name"		=> $name,
					"ioType"	=> $this->ioType,
					"path" 		=> $fPath,
					"hashSimple"=> '',	// 置空,避免缩略图异常(KodIO::hashPath)
					"hashMd5" 	=> 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'	// 占位,避免触发追加md5任务
				);
				// if ($checkHash) {
				// 	$data['hashSimple'] = $driver->hashSimple($path);
				// 	$data['hashMd5'] = $driver->hashMd5($path);
				// }
				$fileID = $fModel->add($data);
			}
			if (!$fileID) {
				$this->writeLog($fileID, $path, $rest);
				IO::remove($sPath, false);	// io_file添加失败,删除该文件
				$rest['success']--;
				continue;
			}

            // 2.3 更新io_source.fileID/size
			$update = array('fileID'=>$fileID,'size'=>$size,'modifyTime'=>$item['modifyTime']);
            $res = $sModel->where(array("sourceID"=>$sourceID))->save($update);
			if (!$res) {
				$this->writeLog($fileID, $path, $rest);
				IO::remove($sPath, false);	// io_source更新失败,删除该文件
				$rest['success']--;
				continue;
			}
			// Cache::set($ccKey, 1);
        }
		$GLOBALS['SHOW_JSON_NOT_EXIT_DONE'] = 0;
		$GLOBALS['SHOW_JSON_NOT_EXIT'] = 0;

		// 3. 更新空文件记录引用数
		if ($fid) {
			$find	= $fModel->find($fid);
			$cnt	= (int) $sModel->where(array('fileID'=>$fid))->count();
			$update	= array('linkCount'	=> ($cnt ? $cnt : 1));
			$fModel->where(array('fileID'=>$fid))->save($update);
		}
		// 4. 更新目标目录大小
		$info = $sModel->pathInfo($parseTo['id'],true);
		$sModel->folderSizeResetChildren($info['sourceID']);
		if ($info['parentID']) {
			$sModel->folderSizeReset($info['parentID']);
		}

		// 5. 结束任务
		$task->task['taskPercent'] = 1;	// 强制设为1
		$task->task['desc'] = LNG('common.success').':'.$rest['success'].','.LNG('common.fail').':'.$rest['error'];
		$msg = LNG('admin.setting.dbCnt').':'.$task->task['taskTotal'] .','. $task->task['desc'];
		Cache::set($taskId, $task->task);
		$task->end();

		// 5.1 日志文件保存、io缓存清除
		$this->wrt256Log($pathFrom, $pathTo, 1);
		$this->ioFileCache($pathFrom, 'clear');
		
		// 5.2 提示输出:页面、cli
		$text = LNG('storeImport.main.end').'from=>'.$pathFrom.'; to=>'.$pathTo.'。'.$msg;
		$this->webEcho($text);
		$this->cliEcho($pathFrom, $msg, 1);

		return $rest;
	}

	// 获取指定目录信息=>size、total——含文件夹
	private function pathTotalInfo($path) {
		if (isset($this->totalInfo)) return $this->totalInfo;
		$info = IO::infoWithChildren($path);
		$size = _get($info, 'size', 0);
		$fileNum	= _get($info, 'children.fileNum');
		$folderNum	= _get($info, 'children.folderNum');
		$this->pathTotal = intval($fileNum) + intval($folderNum);
		$this->totalInfo = array(
			'size'  => _get($info, 'size', 0),
			'total' => $this->pathTotal
		);
		return $this->totalInfo;
	}

	/**
	 * io_file数据缓存
	 * @param [type] $path	{io:1}、{io:1}/a/b/c/1.txt
	 * @param string $act	init/set/clear
	 * @return void
	 */
	private function ioFileCache($path='', $act='get'){
		$baseKey = 'io_file_'.$this->_taskId();	// io_file_xxx
		// 清除缓存
		if ($act == 'clear') {
			$list = Cache::get($baseKey);
			if (!$list) return false;
			foreach ($list as $cckey) {
				Cache::remove($cckey);
			}
			Cache::remove($baseKey);
			return;
		}
		// 根据path获取缓存
		if ($act == 'get') {
			$list = Cache::get($baseKey);
			if (!$list) return false;
			$md5 = md5($path);
			foreach ($list as $cckey) {
				$cache = Cache::get($cckey);
				if (isset($cache[$md5])) return $cache[$md5];
			}
			return false;
		}
		// 初始化缓存
		if ($act != 'init' || !$path) return;
		$kcache = array();	// 分页键名数组缓存
		$model = Model('File');
		$page  = 1; $pageNum = 5000;
		$where = array('ioType' => $this->ioType, 'path' => array('like', $path.'/%'));	// {io:x}
		$list  = $model->where($where)->selectPage($pageNum,$page);
		while($list && $page <= $list['pageInfo']['pageTotal']){
			$cache = array();
			foreach ($list['list'] as $item) {
				$md5 = md5($item['path']);
				$cache[$md5] = array('fileID'=>$item['fileID'],'linkCount'=>$item['linkCount']);
			}
			if ($cache) {
				$cckey = $baseKey.'_p'.$page;	// io_file_xxx_p1
				Cache::set($cckey, $cache);
				$kcache[] = $cckey;
			}
			$page++;$list = $model->where($where)->selectPage($pageNum,$page);
		}
		if ($kcache) Cache::set($baseKey, $kcache);
	}

	// 路径长度超出256,写入日志
	private function wrt256Log($pathFrom, $pathTo, $path='') {
		// 1.初始化任务key
		if ($path === 0) {
			$this->logName = $this->_taskId().'-'.date('His');	// xxx-172027
			return;
		}
		// 2.写入日志
		if ($path !== 1) {
			$temp = explode('-', $this->logName);
			write_log('('.$temp[1].') '.$path, $this->pluginName, 'log-'.$this->logName);
			return;
		}
		// 3.日志文件移动到目标目录
		$log = false;
		$logPath = IO::mkdir($pathTo.'/'.LNG('storeImport.task.errLog'));

		$path = LOG_PATH.strtolower($this->pluginName);
		$list = IO::listPath($path,true);
		foreach ($list['fileList'] as $item) {
			if (!strstr($item['name'], $this->logName)) continue;
			$log = true;
			write_log('move error log to dest: '.$item['path'].'=>'.$logPath, $this->pluginName);
			IO::move($item['path'], $logPath);
		}
		if (!$log) IO::remove($logPath);
	}
	// 页面输出,记录日志
	private function webEcho($msg) {
		if (!isset($this->echoLog)) {
			$this->echoLog = intval($this->in['echoLog']);
		}
		if ($this->echoLog) echoLog($msg);
		write_log($msg, $this->pluginName);
	}
	// 错误日志
	private function writeLog($res, $path, &$rest, $type='file') {
		if ($this->echoLog) {
			$total = $GLOBALS['STORE_IMPORT_FILE_CNT'];
			echoLog('['.$this->pathIdx.'/'.$total.'].'.LNG('storeImport.main.import').LNG('common.'.($res ? 'success' : 'fail')).'('.$type.').'.$path, ($res ? true : false));
		}
		if ($res) {
			$rest['success']++; return true;
		}
		write_log('create '.$type.' error: '.$path, $this->pluginName);
		$rest['error']++; return false;
	}

	// 任务id
	private function _taskId($pathFrom='', $pathTo='') {
		static $taskId;
		if (!$taskId) {
			$pathFrom = trim($this->in['pathFrom'], '/');
			$pathTo	  = trim($this->in['pathTo'], '/');
			$taskId	  = md5('store_import_'.$pathFrom.'_'.$pathTo);
		}
		return $taskId;
	}

	// 获取主任务show_json错误,更新任务
	public function showErrorCheck($result){
		$this->showJsonError = true;
		if(!is_array($result)) return $this->showJson($result);
		if($result['code'] == true || $result['code'] == 1) return $this->showJson($result);

		$pathFrom = trim($this->in['pathFrom'], '/');
        $pathTo	  = trim($this->in['pathTo'], '/');
		$errMsg	  = is_string($result['data']) ? $result['data'] : LNG('storeImport.task.stopErr');
		if ($pathFrom && $pathTo) {
			$taskId	  = $this->_taskId();
			$task	  = Task::get($taskId);
			if (!$task) $task = array('taskPercent' => 1);
			$task['desc'] = $errMsg;
			$task['status'] = 'error';
			Cache::set($taskId, $task);
			Task::kill($taskId);
		}
		$text = array(
		    LNG('storeImport.task.stopErrDesc').'from=>'.$pathFrom.'; to=>'.$pathTo,
		    $this->in,
		    $result,
		    get_caller_info()
		);
		write_log($text, $this->pluginName);
		return $this->showJson($result);
	}

	// 终端执行进度消息
	private function cliProMsg(){
		$idx = $this->pathIdx;		// 当前文件数
		$cnt = $this->pathTotal;	// 文件总数
		$pct = $idx / $cnt;

		// 已用时间(秒)
		$timeUse = timeFloat() - $this->timeStart;
		// 估算总时间 = 已用时间 / 进度百分比
		$timeTotal = $timeUse / $pct;
		// 剩余时间 = 估算总时间 - 已用时间
		$timeRem = $timeTotal - $timeUse;

		$now = str_pad($idx, strlen($cnt), ' ', STR_PAD_LEFT);	// 占位,避免内容抖动
		$rto = str_pad(round($pct * 100, 1), 5, ' ', STR_PAD_LEFT);
		return '('.$now.'/'.$cnt.')'.$rto.'% | '.LNG('storeImport.task.afterTime').$this->timeNeedFormat($timeRem);
	}
	// 剩余时间(s)格式化
	private function timeNeedFormat($sTime=0) {
		$h = floor($sTime / 3600);	// 小时数
		$m = floor(($sTime % 3600) / 60);	// 剩余分钟数
		$s = $sTime % 60;	// 剩余秒数
		$time = '';
		if ($h > 0) $time .= $h . LNG('common.hour');
		if ($m > 0) $time .= $m . LNG('common.minute');
		if ($s > 0 || ($h == 0 && $m == 0)) {
			$time .= $s . LNG('common.second');
		}
		return $time;
	}
	private function cliEcho($pathFrom, $msg='',$code=false) {
		if (!$this->isCli) return;
		// 3.任务结束
		if ($code === 1) {
			echo PHP_EOL.LNG('explorer.success').$msg.PHP_EOL;exit;
		}
		// 2.进度输出:( 23/999) 2.3% | 剩余时间:1小时15分32秒
		if ($code === false) {
			static $idx = 0;
			$idx++;
			// 非末位且小于指定范围的随机值,则不输出——用时间间隔效果更好,但timeFloat相对消耗更多
			if ($this->pathIdx < $this->pathTotal && $idx < mt_rand(50,100)) return;
			$idx = 0;

			echo "\r";	// 清除当前行
			echo str_pad($this->cliProMsg(), 100);	// 占位,避免未替换掉过长的内容
			ob_flush(); flush();	// 强制刷新输出
			return;
		}
		// 1.任务开始
		ob_implicit_flush(true);	// 禁用输出缓冲

		// 1.1 输出标题
		// 存储导入数据:from=>{io:xx}; to=>{source:xx}
		// 准备导入... => 正在导入...(35.4GB)
		echo str_replace(LNG('storeImport.task.startExt'), '', $msg).PHP_EOL;
		echo LNG('storeImport.task.starting');
		ob_flush(); flush();
		
		// 1.2 统计文件总数
		$info = $this->pathTotalInfo($pathFrom);
		$size = size_format($info['size']);
		$this->timeStart = timeFloat();

		echo "\r";
		echo LNG('storeImport.task.ing').'('.str_pad($size,3).')'.PHP_EOL;
	}

	// 终端执行导入
	// TODO ctrl+c不会杀掉任务(但似乎暂停中),可以监听主动杀掉
	public function cliImport(){
		$this->isCli = true;
		if (!is_cli() || !isset($_SERVER['argv'])) {
			$this->showJson(LNG('common.illegalRequest'), false);
		}
		$args = $this->cliArgs();
		if (!$args || empty($args['pathFrom']) || empty($args['pathTo'])) {
			$this->showJson(LNG('common.invalidParam'), false);
		}
		if (!KodUser::isLogin()) $this->showJson(LNG('storeImport.main.loginTokenErr'),false);
		if (!KodUser::isRoot()) $this->showJson(LNG('storeImport.main.noPermission'),false);
		$this->in['pathFrom'] = $args['pathFrom'];
		$this->in['pathTo'] = $args['pathTo'];
		$this->start();
	}
	private function cliArgs($key=false){
        $argv = $_SERVER['argv'];
        if (!is_array($argv)) return array();
		array_shift($argv);
    	array_shift($argv);	// 移除前2个参数:./index.php、admin/storage/import&accessToken=xxx
    	$name = null;
		$args = array();
		foreach ($argv as $arg) {
			if (!substr($arg, 0, 2) == '--') continue;
			$tmp = explode('-', ltrim($arg, '--'));
			$args[$tmp[0]] = isset($tmp[1]) ? $tmp[1] : '';
		}
		return $args;
    }
	
	// show_json,兼容终端执行
	public function showJson($data,$code=true,$info=null) {
		// web
		if (!$this->isCli) {
			if ($this->showJsonError) return $data;
			show_json($data, $code, $info);
		}
		// cli
		$lf = '';	// 换行
		if ($this->showJsonError) {
			$lf = PHP_EOL;
			if (!is_array($data)) {
				$code = false;
			} else {
				$data = _get($data, 'data', '');
				$code = _get($data, 'code'); 
			}
			if (!$data || !is_string($data)) $data = json_encode($data);
		}
		$msg = $code ? LNG('explorer.success') : LNG('explorer.error');
		echo $lf.$msg.$data.PHP_EOL;exit;
	}

}
package.json
wget 'https://sme10.lists2.roe3.org/kodbox/plugins/storeImport/package.json'
View Content
{
	"id":"storeImport",
	"name":"{{LNG['storeImport.meta.name']}}",
	"title":"{{LNG['storeImport.meta.title']}}",
	"version":"1.08",
	"category":"tools",
	"hidden":1,
	"source":{
		"className":"font-icon ri-folder-transfer-line bg-blue-6"
	},
	"description":"{{LNG['storeImport.meta.desc']}}",
	"auther":{
		"copyright":"kodcloud.",
		"homePage":"http://www.kodcloud.com",
	},
	"configItem":{
		"pluginAuth":{
			"type":"userSelect",
			"value":{"user":1},
			"className":"hidden",
			"display":"{{LNG['admin.plugin.auth']}}",
			"desc":"{{LNG['admin.plugin.authDesc']}}",
			"require":1
		},
		"sep001":{
			// "display":"使用说明",
			"value":
			"<div class='info-alert info-alert-blue p-10 align-left can-select can-right-menu'>
			<li class='mb-5'>通过对指定磁盘或对象存储添加的存储进行扫描,自动构建索引,将原始数据映射到网盘目录,实现存储数据快速导入。</li>
			<li>使用说明见 <a href='./#admin/storage/index' target='_blank'>存储管理</a> -【数据导入】。</li>
			</div>"
		}
	}
}