ResumeUploader.php 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. <?php
  2. namespace Qiniu\Storage;
  3. use Qiniu\Config;
  4. use Qiniu\Http\Client;
  5. use Qiniu\Http\Error;
  6. use Qiniu\Enum\SplitUploadVersion;
  7. use Qiniu\Http\RequestOptions;
  8. /**
  9. * 断点续上传类, 该类主要实现了断点续上传中的分块上传,
  10. * 以及相应地创建块和创建文件过程.
  11. *
  12. * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkblk.html
  13. * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkfile.html
  14. */
  15. final class ResumeUploader
  16. {
  17. private $upToken;
  18. private $key;
  19. private $inputStream;
  20. private $size;
  21. private $params;
  22. private $mime;
  23. private $contexts;
  24. private $finishedEtags;
  25. private $host;
  26. private $bucket;
  27. private $currentUrl;
  28. private $config;
  29. private $resumeRecordFile;
  30. private $version;
  31. private $partSize;
  32. /**
  33. * @var RequestOptions
  34. */
  35. private $reqOpt;
  36. /**
  37. * 上传二进制流到七牛
  38. *
  39. * @param string $upToken 上传凭证
  40. * @param string $key 上传文件名
  41. * @param resource $inputStream 上传二进制流
  42. * @param int $size 上传流的大小
  43. * @param array<string, string> $params 自定义变量
  44. * @param string $mime 上传数据的mimeType
  45. * @param Config $config
  46. * @param string $resumeRecordFile 断点续传的已上传的部分信息记录文件
  47. * @param string $version 分片上传版本 目前支持v1/v2版本 默认v1
  48. * @param int $partSize 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
  49. * @param RequestOptions $reqOpt 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
  50. * @throws \Exception
  51. *
  52. * @link http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
  53. */
  54. public function __construct(
  55. $upToken,
  56. $key,
  57. $inputStream,
  58. $size,
  59. $params,
  60. $mime,
  61. $config,
  62. $resumeRecordFile = null,
  63. $version = 'v1',
  64. $partSize = config::BLOCK_SIZE,
  65. $reqOpt = null
  66. ) {
  67. $this->upToken = $upToken;
  68. $this->key = $key;
  69. $this->inputStream = $inputStream;
  70. $this->size = $size;
  71. $this->params = $params;
  72. $this->mime = $mime;
  73. $this->contexts = array();
  74. $this->finishedEtags = array("etags" => array(), "uploadId" => "", "expiredAt" => 0, "uploaded" => 0);
  75. $this->config = $config;
  76. $this->resumeRecordFile = $resumeRecordFile ? $resumeRecordFile : null;
  77. $this->partSize = $partSize ? $partSize : config::BLOCK_SIZE;
  78. if ($reqOpt === null) {
  79. $reqOpt = new RequestOptions();
  80. }
  81. $this->reqOpt = $reqOpt;
  82. try {
  83. $this->version = SplitUploadVersion::from($version ? $version : 'v1');
  84. } catch (\Exception $e) {
  85. throw new \Exception("only support v1/v2 now!", 0, $e);
  86. }
  87. list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($upToken);
  88. $this->bucket = $bucket;
  89. if ($err != null) {
  90. return array(null, $err);
  91. }
  92. list($upHost, $err) = $config->getUpHostV2($accessKey, $bucket, $reqOpt);
  93. if ($err != null) {
  94. throw new \Exception($err->message(), 1);
  95. }
  96. $this->host = $upHost;
  97. }
  98. /**
  99. * 上传操作
  100. * @param $fname string 文件名
  101. *
  102. * @throws \Exception
  103. */
  104. public function upload($fname)
  105. {
  106. $blkputRets = null;
  107. // get upload record from resumeRecordFile
  108. if ($this->resumeRecordFile != null) {
  109. if (file_exists($this->resumeRecordFile)) {
  110. $stream = fopen($this->resumeRecordFile, 'r');
  111. if ($stream) {
  112. $streamLen = filesize($this->resumeRecordFile);
  113. if ($streamLen > 0) {
  114. $contents = fread($stream, $streamLen);
  115. fclose($stream);
  116. if ($contents) {
  117. $blkputRets = json_decode($contents, true);
  118. if ($blkputRets === null) {
  119. error_log("resumeFile contents decode error");
  120. }
  121. } else {
  122. error_log("read resumeFile failed");
  123. }
  124. } else {
  125. error_log("resumeFile is empty");
  126. }
  127. } else {
  128. error_log("resumeFile open failed");
  129. }
  130. } else {
  131. error_log("resumeFile not exists");
  132. }
  133. }
  134. if ($this->version == SplitUploadVersion::V1) {
  135. return $this->uploadV1($fname, $blkputRets);
  136. } elseif ($this->version == SplitUploadVersion::V2) {
  137. return $this->uploadV2($fname, $blkputRets);
  138. } else {
  139. throw new \Exception("only support v1/v2 now!");
  140. }
  141. }
  142. /**
  143. * @param string $fname 文件名
  144. * @param null|array $blkputRets
  145. *
  146. * @throws \Exception
  147. */
  148. private function uploadV1($fname, $blkputRets = null)
  149. {
  150. // 尝试恢复恢复已上传的数据
  151. $isResumeUpload = $blkputRets !== null;
  152. $this->contexts = array();
  153. if ($blkputRets) {
  154. if (isset($blkputRets['contexts']) && isset($blkputRets['uploaded']) &&
  155. is_array($blkputRets['contexts']) && is_int($blkputRets['uploaded'])
  156. ) {
  157. $this->contexts = array_map(function ($ctx) {
  158. if (is_array($ctx)) {
  159. return $ctx;
  160. } else {
  161. // 兼容旧版本(旧版本没有存储 expireAt)
  162. return array(
  163. "ctx" => $ctx,
  164. "expiredAt" => 0,
  165. );
  166. }
  167. }, $blkputRets['contexts']);
  168. }
  169. }
  170. // 上传分片
  171. $uploaded = 0;
  172. while ($uploaded < $this->size) {
  173. $blockSize = $this->blockSize($uploaded);
  174. $blockIndex = $uploaded / $this->partSize;
  175. if (!is_int($blockIndex)) {
  176. throw new \Exception("v1 part size changed");
  177. }
  178. // 如果已上传该分片且没有过期
  179. if (isset($this->contexts[$blockIndex]) && $this->contexts[$blockIndex]["expiredAt"] >= time()) {
  180. $uploaded += $blockSize;
  181. fseek($this->inputStream, $blockSize, SEEK_CUR);
  182. continue;
  183. }
  184. $data = fread($this->inputStream, $blockSize);
  185. if ($data === false) {
  186. throw new \Exception("file read failed", 1);
  187. }
  188. $crc = \Qiniu\crc32_data($data);
  189. $response = $this->makeBlock($data, $blockSize);
  190. $ret = null;
  191. if ($response->ok() && $response->json() != null) {
  192. $ret = $response->json();
  193. }
  194. if ($response->statusCode < 0) {
  195. list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($this->upToken);
  196. if ($err != null) {
  197. return array(null, $err);
  198. }
  199. list($upHostBackup, $err) = $this->config->getUpBackupHostV2($accessKey, $bucket, $this->reqOpt);
  200. if ($err != null) {
  201. return array(null, $err);
  202. }
  203. $this->host = $upHostBackup;
  204. }
  205. if ($response->needRetry() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
  206. $response = $this->makeBlock($data, $blockSize);
  207. $ret = $response->json();
  208. }
  209. if (!$response->ok() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
  210. return array(null, new Error($this->currentUrl, $response));
  211. }
  212. // 如果可以在已上传取到说明是过期分片直接修改已上传信息,否则是新的片添加到已上传分片尾部
  213. if (isset($this->contexts[$blockIndex])) {
  214. $this->contexts[$blockIndex] = array(
  215. 'ctx' => $ret['ctx'],
  216. 'expiredAt' => $ret['expired_at'],
  217. );
  218. } else {
  219. array_push($this->contexts, array(
  220. 'ctx' => $ret['ctx'],
  221. 'expiredAt' => $ret['expired_at'],
  222. ));
  223. }
  224. $uploaded += $blockSize;
  225. // 记录断点
  226. if ($this->resumeRecordFile !== null) {
  227. $recordData = array(
  228. 'contexts' => $this->contexts,
  229. 'uploaded' => $uploaded
  230. );
  231. $recordData = json_encode($recordData);
  232. if ($recordData) {
  233. $isWritten = file_put_contents($this->resumeRecordFile, $recordData);
  234. if ($isWritten === false) {
  235. error_log("write resumeRecordFile failed");
  236. }
  237. } else {
  238. error_log('resumeRecordData encode failed');
  239. }
  240. }
  241. }
  242. // 完成上传
  243. list($ret, $err) = $this->makeFile($fname);
  244. if ($err !== null) {
  245. $response = $err->getResponse();
  246. if ($isResumeUpload && $response->statusCode === 701) {
  247. fseek($this->inputStream, 0);
  248. return $this->uploadV1($fname);
  249. }
  250. }
  251. return array($ret, $err);
  252. }
  253. /**
  254. * @param string $fname 文件名
  255. * @param null|array $blkputRets
  256. *
  257. * @throws \Exception
  258. */
  259. private function uploadV2($fname, $blkputRets = null)
  260. {
  261. $uploaded = 0;
  262. $partNumber = 1;
  263. $encodedObjectName = $this->key ? \Qiniu\base64_urlSafeEncode($this->key) : '~';
  264. $isResumeUpload = $blkputRets !== null;
  265. // 初始化 upload id
  266. $err = null;
  267. if ($blkputRets) {
  268. if (isset($blkputRets["etags"]) && isset($blkputRets["uploadId"]) &&
  269. isset($blkputRets["expiredAt"]) && $blkputRets["expiredAt"] > time() &&
  270. $blkputRets["uploaded"] > 0 && is_array($blkputRets["etags"]) &&
  271. is_string($blkputRets["uploadId"]) && is_int($blkputRets["expiredAt"])
  272. ) {
  273. $this->finishedEtags['etags'] = $blkputRets["etags"];
  274. $this->finishedEtags["uploadId"] = $blkputRets["uploadId"];
  275. $this->finishedEtags["expiredAt"] = $blkputRets["expiredAt"];
  276. $this->finishedEtags["uploaded"] = $blkputRets["uploaded"];
  277. $uploaded = $blkputRets["uploaded"];
  278. $partNumber = count($this->finishedEtags["etags"]) + 1;
  279. } else {
  280. $err = $this->makeInitReq($encodedObjectName);
  281. }
  282. } else {
  283. $err = $this->makeInitReq($encodedObjectName);
  284. }
  285. if ($err != null) {
  286. return array(null, $err);
  287. }
  288. // 上传分片
  289. fseek($this->inputStream, $uploaded);
  290. while ($uploaded < $this->size) {
  291. $blockSize = $this->blockSize($uploaded);
  292. $data = fread($this->inputStream, $blockSize);
  293. if ($data === false) {
  294. throw new \Exception("file read failed", 1);
  295. }
  296. $md5 = md5($data);
  297. $response = $this->uploadPart(
  298. $data,
  299. $partNumber,
  300. $this->finishedEtags["uploadId"],
  301. $encodedObjectName,
  302. $md5
  303. );
  304. $ret = null;
  305. if ($response->ok() && $response->json() != null) {
  306. $ret = $response->json();
  307. }
  308. if ($response->statusCode < 0) {
  309. list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($this->upToken);
  310. if ($err != null) {
  311. return array(null, $err);
  312. }
  313. list($upHostBackup, $err) = $this->config->getUpBackupHostV2($accessKey, $bucket, $this->reqOpt);
  314. if ($err != null) {
  315. return array(null, $err);
  316. }
  317. $this->host = $upHostBackup;
  318. }
  319. if ($response->needRetry() || !isset($ret['md5']) || $md5 != $ret['md5']) {
  320. $response = $this->uploadPart(
  321. $data,
  322. $partNumber,
  323. $this->finishedEtags["uploadId"],
  324. $encodedObjectName,
  325. $md5
  326. );
  327. $ret = $response->json();
  328. }
  329. if ($isResumeUpload && $response->statusCode === 612) {
  330. return $this->uploadV2($fname);
  331. }
  332. if (!$response->ok() || !isset($ret['md5']) || $md5 != $ret['md5']) {
  333. return array(null, new Error($this->currentUrl, $response));
  334. }
  335. $blockStatus = array('etag' => $ret['etag'], 'partNumber' => $partNumber);
  336. array_push($this->finishedEtags['etags'], $blockStatus);
  337. $partNumber += 1;
  338. $uploaded += $blockSize;
  339. $this->finishedEtags['uploaded'] = $uploaded;
  340. if ($this->resumeRecordFile !== null) {
  341. $recordData = json_encode($this->finishedEtags);
  342. if ($recordData) {
  343. $isWritten = file_put_contents($this->resumeRecordFile, $recordData);
  344. if ($isWritten === false) {
  345. error_log("write resumeRecordFile failed");
  346. }
  347. } else {
  348. error_log('resumeRecordData encode failed');
  349. }
  350. }
  351. }
  352. list($ret, $err) = $this->completeParts($fname, $this->finishedEtags['uploadId'], $encodedObjectName);
  353. if ($err !== null) {
  354. $response = $err->getResponse();
  355. if ($isResumeUpload && $response->statusCode === 612) {
  356. return $this->uploadV2($fname);
  357. }
  358. }
  359. return array($ret, $err);
  360. }
  361. /**
  362. * 创建块
  363. */
  364. private function makeBlock($block, $blockSize)
  365. {
  366. $url = $this->host . '/mkblk/' . $blockSize;
  367. return $this->post($url, $block);
  368. }
  369. private function fileUrl($fname)
  370. {
  371. $url = $this->host . '/mkfile/' . $this->size;
  372. $url .= '/mimeType/' . \Qiniu\base64_urlSafeEncode($this->mime);
  373. if ($this->key != null) {
  374. $url .= '/key/' . \Qiniu\base64_urlSafeEncode($this->key);
  375. }
  376. $url .= '/fname/' . \Qiniu\base64_urlSafeEncode($fname);
  377. if (!empty($this->params)) {
  378. foreach ($this->params as $key => $value) {
  379. $val = \Qiniu\base64_urlSafeEncode($value);
  380. $url .= "/$key/$val";
  381. }
  382. }
  383. return $url;
  384. }
  385. /**
  386. * 创建文件
  387. *
  388. * @param string $fname 文件名
  389. * @return array{array | null, Error | null}
  390. */
  391. private function makeFile($fname)
  392. {
  393. $url = $this->fileUrl($fname);
  394. $body = implode(',', array_map(function ($ctx) {
  395. return $ctx['ctx'];
  396. }, $this->contexts));
  397. $response = $this->post($url, $body);
  398. if ($response->needRetry()) {
  399. $response = $this->post($url, $body);
  400. }
  401. if ($response->statusCode === 200 || $response->statusCode === 701) {
  402. if ($this->resumeRecordFile !== null) {
  403. @unlink($this->resumeRecordFile);
  404. }
  405. }
  406. if (!$response->ok()) {
  407. return array(null, new Error($this->currentUrl, $response));
  408. }
  409. return array($response->json(), null);
  410. }
  411. private function post($url, $data)
  412. {
  413. $this->currentUrl = $url;
  414. $headers = array('Authorization' => 'UpToken ' . $this->upToken);
  415. return Client::post($url, $data, $headers, $this->reqOpt);
  416. }
  417. private function blockSize($uploaded)
  418. {
  419. if ($this->size < $uploaded + $this->partSize) {
  420. return $this->size - $uploaded;
  421. }
  422. return $this->partSize;
  423. }
  424. private function makeInitReq($encodedObjectName)
  425. {
  426. list($ret, $err) = $this->initReq($encodedObjectName);
  427. if ($ret == null) {
  428. return $err;
  429. }
  430. $this->finishedEtags["uploadId"] = $ret['uploadId'];
  431. $this->finishedEtags["expiredAt"] = $ret['expireAt'];
  432. return $err;
  433. }
  434. /**
  435. * 初始化上传任务
  436. */
  437. private function initReq($encodedObjectName)
  438. {
  439. $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName . '/uploads';
  440. $headers = array(
  441. 'Authorization' => 'UpToken ' . $this->upToken,
  442. 'Content-Type' => 'application/json'
  443. );
  444. $response = $this->postWithHeaders($url, null, $headers);
  445. $ret = $response->json();
  446. if ($response->ok() && $ret != null) {
  447. return array($ret, null);
  448. }
  449. return array(null, new Error($url, $response));
  450. }
  451. /**
  452. * 分块上传v2
  453. */
  454. private function uploadPart($block, $partNumber, $uploadId, $encodedObjectName, $md5)
  455. {
  456. $headers = array(
  457. 'Authorization' => 'UpToken ' . $this->upToken,
  458. 'Content-Type' => 'application/octet-stream',
  459. 'Content-MD5' => $md5
  460. );
  461. $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName .
  462. '/uploads/' . $uploadId . '/' . $partNumber;
  463. $response = $this->put($url, $block, $headers);
  464. if ($response->statusCode === 612) {
  465. if ($this->resumeRecordFile !== null) {
  466. @unlink($this->resumeRecordFile);
  467. }
  468. }
  469. return $response;
  470. }
  471. /**
  472. * 完成分片上传V2
  473. *
  474. * @param string $fname 文件名
  475. * @param int $uploadId 由 {@see initReq} 获取
  476. * @param string $encodedObjectName 经过编码的存储路径
  477. * @return array{array | null, Error | null}
  478. */
  479. private function completeParts($fname, $uploadId, $encodedObjectName)
  480. {
  481. $headers = array(
  482. 'Authorization' => 'UpToken ' . $this->upToken,
  483. 'Content-Type' => 'application/json'
  484. );
  485. $etags = $this->finishedEtags['etags'];
  486. $sortedEtags = \Qiniu\arraySort($etags, 'partNumber');
  487. $metadata = array();
  488. $customVars = array();
  489. if ($this->params) {
  490. foreach ($this->params as $k => $v) {
  491. if (strpos($k, 'x:') === 0) {
  492. $customVars[$k] = $v;
  493. } elseif (strpos($k, 'x-qn-meta-') === 0) {
  494. $metadata[$k] = $v;
  495. }
  496. }
  497. }
  498. if (empty($metadata)) {
  499. $metadata = null;
  500. }
  501. if (empty($customVars)) {
  502. $customVars = null;
  503. }
  504. $body = array(
  505. 'fname' => $fname,
  506. 'mimeType' => $this->mime,
  507. 'metadata' => $metadata,
  508. 'customVars' => $customVars,
  509. 'parts' => $sortedEtags
  510. );
  511. $jsonBody = json_encode($body);
  512. $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName . '/uploads/' . $uploadId;
  513. $response = $this->postWithHeaders($url, $jsonBody, $headers);
  514. if ($response->needRetry()) {
  515. $response = $this->postWithHeaders($url, $jsonBody, $headers);
  516. }
  517. if ($response->statusCode === 200 || $response->statusCode === 612) {
  518. if ($this->resumeRecordFile !== null) {
  519. @unlink($this->resumeRecordFile);
  520. }
  521. }
  522. if (!$response->ok()) {
  523. return array(null, new Error($this->currentUrl, $response));
  524. }
  525. return array($response->json(), null);
  526. }
  527. private function put($url, $data, $headers)
  528. {
  529. $this->currentUrl = $url;
  530. return Client::put($url, $data, $headers, $this->reqOpt);
  531. }
  532. private function postWithHeaders($url, $data, $headers)
  533. {
  534. $this->currentUrl = $url;
  535. return Client::post($url, $data, $headers, $this->reqOpt);
  536. }
  537. }