10 code examples of PHP ZenCodex\ComposerMirror\Log extracted from open source projects
/**
* 更新 packages.json
* @param $config
*/
public function flushFiles($config)
{
$app = App::getInstance();
$cachedir = $config->cachedir;
$packages = json_decode(file_get_contents($cachedir . 'packages.json.new'));
$packages->mirrors = [
[
'dist-url' => $config->distUrl . '%package%/%reference%.%type%',
'preferred' => true,
],
];
$packages->update_at = date('Y-m-d H:i:s', $app->timestamp);
file_put_contents($config->cachedir . 'packages.json', json_encode($packages));
unlink($config->cachedir . 'packages.json.new');
App::pushJob2Task($config->cachedir . 'packages.json');
Log::debug('finished! flushFiles...');
}
/*
|--------------------------------------------------------------------------
| Main()
|--------------------------------------------------------------------------
|
| 核心采集代码
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$config = App::getConfig();
// 检测并生成必要目录
file_exists($config->cachedir) or mkdir($config->cachedir, 0777, true);
file_exists($config->distdir) or mkdir($config->distdir, 0777, true);
$signal_handler = function ($signal) {
Log::warn('kill signal, please wait ...');
App::getInstance()->terminated = 1;
};
pcntl_signal(SIGINT, $signal_handler); // Ctrl + C
pcntl_signal(SIGCHLD, $signal_handler);
pcntl_signal(SIGTSTP, $signal_handler); // Ctrl + Z
// 临时 workaround
@exec('rm -f /tmp/composer_*');
if (App::getInstance()->isPush2Cloud) {
// 初始化 producer
$clientHandler = App::getClientHandler();
$stats = $clientHandler->stats();
if (intval($stats['current-jobs-ready']) > 0) {
Log::warn('还有未完成的jobs,继续等待');
sleep(30);
exit();
}
} else {
Log::warn('NOTE: WOULD NOT SYNC TO CLOUD');
}
// STEP 1
$providers = $this->downloadProviders($config);
if (FileUtils::badCountOfProviderPackages() !== 0) {
throw new \RuntimeException('!!! flushFiles => packages.json.new 存在错误,跳过更新 !!!');
}
// STEP 2
$jsonfiles = $this->downloadPackages($config, $providers);
// STEP 3
$this->downloadZipballs($config, $jsonfiles);
// STEP 4
$this->flushFiles($config);
//unset($globals->expiredManager);
Log::warn('wait for 60s....');
sleep(60);
}
protected function badCountOfProviderPackages($baseFile = 'packages.json.new')
{
Log::info('------------- ' . __FUNCTION__ . ' -------------');
$cachedir = $this->config->cachedir;
$packagejson = json_decode(file_get_contents($cachedir . $baseFile));
$i = $j = 0;
foreach ($packagejson->{'provider-includes'} as $tpl => $provider) {
$providerjson = str_replace('%hash%', $provider->sha256, $tpl);
$file = $cachedir . $providerjson;
if (!file_exists($file)) {
Log::error('LOST FILE => ' . $file);
$i++;
} elseif ($provider->sha256 !== hash_file('sha256', $file)) {
Log::error('HASH ERROR => ' . $file);
$i++;
// 删除错误文件,重新下载
unlink($file);
} else {
$j++;
}
}
Log::info($i . ' / ' . ($i + $j));
return $i;
}
/**
* check sha256
*/
protected function badCountOfAllPackages()
{
Log::info('------------- ' . __FUNCTION__ . ' -------------');
$app = App::getInstance();
$cachedir = $this->config->cachedir;
$packagejson = json_decode(file_get_contents($cachedir . 'packages.json'));
$i = $j = 0;
$errCount = $touchCount = $allCount = 0;
foreach ($packagejson->{'provider-includes'} as $tpl => $provider) {
$providerjson = str_replace('%hash%', $provider->sha256, $tpl);
$packages = json_decode(file_get_contents($cachedir . $providerjson));
$progressBar = new ProgressBarManager(0, count((array) $packages->providers));
$progressBar->setFormat("$tpl : %current%/%max% [%bar%] %percent%%");
foreach ($packages->providers as $tpl2 => $sha) {
$file = $cachedir . "p/$tpl2\$$sha->sha256.json";
if (!file_exists($file)) {
Log::error('LOST FILE => ' . $file);
$i++;
} elseif ($sha->sha256 !== hash_file('sha256', $file)) {
Log::error('HASH ERROR => ' . $file);
$i++;
unlink($file);
} else {
$j++;
}
// 检测zip包
$originContent = file_get_contents($file);
$packageData = json_decode($originContent, true);
foreach ($packageData['packages'] as $packageName => $versions) {
foreach ($versions as $verNumber => $vMeta) {
// 废弃的包 dist url 为null,跳过不处理
// bananeapocalypse/nuitinfo2013api
// This package is abandoned and no longer maintained. No replacement package was suggested.
if (!$vMeta['dist']['url']) {
// Log::error('发现异常包,跳过: ' . $vMeta['dist']['url']);
$errCount++;
continue;
}
// 保存 github/bitbucket ... 真实对应下载地址
$zipFile = $this->config->distdir . $packageName . '/' . $vMeta['dist']['reference'] . '.zip';
if (!file_exists($zipFile)) {
$this->storeFile($zipFile, $vMeta['dist']['url']);
if (!$this->config->cloudsync) {
continue;
}
$app->pushJob2Task($zipFile);
} else {
$this->touchFile($zipFile, $app->timestamp);
$touchCount++;
}
$allCount++;
}
}
$progressBar->advance();
}
}
// 保存时间
$line = implode(',', [$app->timestamp, $errCount, $touchCount, $allCount]);
file_put_contents($this->config->dbdir . 'touchall.log', $line . PHP_EOL, FILE_APPEND);
Log::info($i . ' / ' . ($i + $j));
return $i;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$cloud = App::getInstance()->getCloud();
$this->wp = new WorkerPool();
$this->wp->setWorkerPoolSize(10)->create(new ClosureWorker(
function ($jobData, $semaphone, $storage) use ($cloud) {
$cloud->{$jobData->method}($jobData->data);
}
));
// $isExit = false;
// $signal_handler = function ($signal) use(&$isExit) {
// $this->warn("kill signal, please wait for all works done");
// $isExit = true;
// };
//
// pcntl_signal(SIGINT, $signal_handler); // Ctrl + C
// pcntl_signal(SIGCHLD, $signal_handler);
// pcntl_signal(SIGTSTP, $signal_handler); // Ctrl + Z
$beanstalk = App::getInstance()->getClientHandler();
$beanstalk->watch('composer');
$stats = $beanstalk->stats();
Log::info('current-jobs-ready => ' . $stats['current-jobs-ready']);
Log::info('current-jobs-reserved => ' . $stats['current-jobs-reserved']);
Log::info('current-jobs-buried => ' . $stats['current-jobs-buried']);
while (1) {
$job = $beanstalk->reserve(15); // Block until job is available.
if (!$job) {
break;
}
$jobData = json_decode($job->getData());
if (!method_exists($cloud, $jobData->method)) {
throw new \RuntimeException('找不到此方法 => ' . $jobData->method);
}
// 只处理最后一个 packages.json,其余的多进程处理
if ($this->isMainPackageFile($jobData)) {
$this->processMainPackageFile($jobData);
} else {
$this->wp->run($jobData);
}
try {
$beanstalk->delete($job);
} catch (Exception $e) {
// Noting to do
}
}
Log::info('DONE!');
}
/**
* 更新 packages.json
* @param $config
*/
public function flushFiles($config)
{
$app = App::getInstance();
$cachedir = $config->cachedir;
$packages = json_decode(file_get_contents($cachedir . 'packages.json.new'));
/* $packages->mirrors = [
[
'dist-url' => $config->distUrl . '%package%/%reference%.%type%',
'preferred' => true,
]
]; */
$packages->update_at = date('Y-m-d H:i:s', $app->timestamp);
$prefix = parse_url($config->mirrorUrl)['path'];
$packages->{'providers-url'} = $prefix . $packages->{'providers-url'};
file_put_contents($config->cachedir . 'packages.json.new', json_encode($packages));
// replace the file atomically
rename($config->cachedir . 'packages.json.new', $config->cachedir . 'packages.json');
App::pushJob2Task($config->cachedir . 'packages.json');
Log::debug('finished! flushFiles...');
}
/**
* composer.json downloader
*/
public function downloadPackages($config, $providers)
{
$cachedir = $config->cachedir;
$i = 1;
$numberOfProviders = count($providers);
$jsonfiles = [];
$packageObjs = [];
$app = App::getInstance();
$sum = 0;
foreach ($providers as $providerjson) {
$list = json_decode(file_get_contents($providerjson));
if (!$list || empty($list->providers)) {
continue;
}
$list = $list->providers;
// $all = count((array)$list);
// $progressBar = new ProgressBarManager(0, $all);
echo " - Provider {$i}/{$numberOfProviders}:\n";
// $progressBar->setFormat(" - Package: %current%/%max% [%bar%] %percent%%");
foreach ($list as $packageName => $provider) {
$app->terminated and exit();
if (in_array($packageName, self::IGNORE_PACKAGES)) {
Log::warn("Ignore package: $packageName");
continue;
}
// $progressBar->advance();
$sum++;
$url = "$config->packagistUrl/p/$packageName\$$provider->sha256.json";
$cachefile = $cachedir . str_replace("$config->packagistUrl/", '', $url);
if (file_exists($cachefile)) {
FileUtils::touchFile($cachefile, $app->timestamp);
continue;
}
$packageObjs[] = (object) [
'packageName' => $packageName,
'url' => $url,
'sha256' => $provider->sha256,
];
}
$i++;
}
Log::warn("Total packages count = $sum");
// 开始下载
$arrChuncks = array_chunk($packageObjs, $config->maxConnections);
$progressBar = new ProgressBarManager(0, count($packageObjs));
$progressBar->setFormat(' - New Packages: %current%/%max% [%bar%] %percent%%');
$client = new Client([RequestOptions::TIMEOUT => $config->timeout]);
foreach ($arrChuncks as $chunk) {
$requests = [];
foreach ($chunk as $package) {
App::getInstance()->terminated and exit();
$req = new Request('GET', $package->url);
$req->sha256 = $package->sha256;
$req->packageName = $package->packageName;
$requests[] = $req;
}
$pool = new Pool($client, $requests, [
'concurrency' => $config->maxConnections,
'fulfilled' => function ($res, $index) use (&$jsonfiles, &$requests, $progressBar) {
$config = App::getConfig();
$req = $requests[$index];
$cachedir = $config->cachedir;
$progressBar->advance();
if (200 !== $res->getStatusCode() || $req->sha256 !== hash('sha256', (string) $res->getBody())) {
Log::error("\t sha256 wrong => " . $req->getUri());
return;
}
$cachefile = $cachedir . str_replace("$config->packagistUrl/", '', $req->getUri());
// $cachefile2 = $cachedir . '/p/' . $req->packageName . '.json';
$jsonfiles[] = $cachefile;
// if ($glob = glob("{$cachedir}p/$req->packageName\$*")) {
// foreach ($glob as $old) {
// $globals->expiredManager->add($old, time());
// }
// }
FileUtils::storeFile($cachefile, (string) $res->getBody());
App::pushJob2Task($cachefile);
},
'rejected' => function ($reason, $index) use (&$requests, &$progressBar) {
Log::error($requests[$index]->getUri() . ' => failed');
$progressBar->advance();
},
]);
$pool->promise()->wait();
}
return $jsonfiles;
}
/*
|--------------------------------------------------------------------------
| Main()
|--------------------------------------------------------------------------
|
| 核心采集代码
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$config = App::getConfig();
// 检测并生成必要目录
file_exists($config->cachedir) or mkdir($config->cachedir, 0777, true);
file_exists($config->distdir) or mkdir($config->distdir, 0777, true);
$signal_handler = function ($signal) {
Log::warn('kill signal, please wait ...');
App::getInstance()->terminated = 1;
};
pcntl_signal(SIGINT, $signal_handler); // Ctrl + C
pcntl_signal(SIGCHLD, $signal_handler);
pcntl_signal(SIGTSTP, $signal_handler); // Ctrl + Z
// 临时 workaround
@exec('rm -f /tmp/composer_*');
if (App::getInstance()->isPush2Cloud) {
// 初始化 producer
$clientHandler = App::getClientHandler();
$stats = $clientHandler->stats();
if (intval($stats['current-jobs-ready']) > 0) {
Log::warn('还有未完成的jobs,继续等待');
sleep(30);
exit();
}
} else {
Log::warn('NOTE: WOULD NOT SYNC TO CLOUD');
}
// STEP 1
$providers = $this->downloadProviders($config);
if (FileUtils::badCountOfProviderPackages() !== 0) {
throw new \RuntimeException('!!! flushFiles => packages.json.new 存在错误,跳过更新 !!!');
}
// STEP 2
$jsonfiles = $this->downloadPackages($config, $providers);
// STEP 3
// $this->downloadZipballs($config, $jsonfiles);
// STEP 4
$this->flushFiles($config);
//unset($globals->expiredManager);
Log::warn('wait for 60s....');
sleep(60);
}
public function clearCloudDiffFiles()
{
$config = App::getConfig();
$cloud = new Cloud($config);
$i = $index = $startfrom = 0;
if (file_exists($config->dbdir . Rainbow::DIST_URI_MAP . '.log')) {
$startfrom = intval(file_get_contents($config->dbdir . Rainbow::DIST_URI_MAP . '.log'));
}
$mapUris = file($config->dbdir . Rainbow::DIST_URI_MAP, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
foreach ($mapUris as $uri) {
$index++;
if ($index < $startfrom) {
continue;
}
$zipFile = $config->distdir . ltrim($uri, '/');
if (!file_exists($zipFile)) {
Log::info('local not found => ' . $zipFile);
$cloud->removeRemoteFile($zipFile);
file_put_contents($config->dbdir . Rainbow::DIST_URI_MAP . '.log', $index);
$i++;
}
}
Log::warn("$i files removed from cloud");
}
// 将文件上传到 又拍云
public function pushOneFile($file)
{
Log::info($file);
if (!file_exists($file)) {
throw new \RuntimeException('pushOneFile, Not found => ' . $file);
}
// default value for return
$ret = -1;
if (count($this->lastUpload) > 1000) {
$this->lastUpload = [];
}
// fix issue: {"msg":"too many requests of the same uri","code":42900002 }
// sleep to wait
if (isset($this->lastUpload[$file]) && (time() - intval($this->lastUpload[$file]) < 30)) {
Log::warn('wait 10s, workaround => too many requests of the same uri');
sleep(30);
}
[$ext, $uri, $tmpfile] = $this->pickFileInfo($file);
if (empty($tmpfile)) {
goto __END__;
}
try {
$f = fopen($tmpfile, 'rb');
// 根据扩展名指定bucket,上传到又拍云
$this->cloudDisk($ext)->writeStream($uri, $f);
Log::debug('pushOneFile success => ' . $file);
$ret = 1;
} catch (\Exception $e) {
Log::error("pushOneFile => $file \n" . $e->getMessage());
}
__END__:
$ext == 'zip' and file_exists($tmpfile) and unlink($tmpfile);
return $ret;
}