| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 | 
							- <?php
 
- namespace Qiniu\Storage;
 
- use Qiniu\Config;
 
- use Qiniu\Http\Client;
 
- use Qiniu\Http\Error;
 
- use Qiniu\Enum\SplitUploadVersion;
 
- use Qiniu\Http\RequestOptions;
 
- /**
 
-  * 断点续上传类, 该类主要实现了断点续上传中的分块上传,
 
-  * 以及相应地创建块和创建文件过程.
 
-  *
 
-  * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkblk.html
 
-  * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkfile.html
 
-  */
 
- final class ResumeUploader
 
- {
 
-     private $upToken;
 
-     private $key;
 
-     private $inputStream;
 
-     private $size;
 
-     private $params;
 
-     private $mime;
 
-     private $contexts;
 
-     private $finishedEtags;
 
-     private $host;
 
-     private $bucket;
 
-     private $currentUrl;
 
-     private $config;
 
-     private $resumeRecordFile;
 
-     private $version;
 
-     private $partSize;
 
-     /**
 
-      * @var RequestOptions
 
-      */
 
-     private $reqOpt;
 
-     /**
 
-      * 上传二进制流到七牛
 
-      *
 
-      * @param string $upToken 上传凭证
 
-      * @param string $key 上传文件名
 
-      * @param resource $inputStream 上传二进制流
 
-      * @param int $size 上传流的大小
 
-      * @param array<string, string> $params 自定义变量
 
-      * @param string $mime 上传数据的mimeType
 
-      * @param Config $config
 
-      * @param string $resumeRecordFile 断点续传的已上传的部分信息记录文件
 
-      * @param string $version 分片上传版本 目前支持v1/v2版本 默认v1
 
-      * @param int $partSize 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
 
-      * @param RequestOptions $reqOpt 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
 
-      * @throws \Exception
 
-      *
 
-      * @link http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
 
-      */
 
-     public function __construct(
 
-         $upToken,
 
-         $key,
 
-         $inputStream,
 
-         $size,
 
-         $params,
 
-         $mime,
 
-         $config,
 
-         $resumeRecordFile = null,
 
-         $version = 'v1',
 
-         $partSize = config::BLOCK_SIZE,
 
-         $reqOpt = null
 
-     ) {
 
-         $this->upToken = $upToken;
 
-         $this->key = $key;
 
-         $this->inputStream = $inputStream;
 
-         $this->size = $size;
 
-         $this->params = $params;
 
-         $this->mime = $mime;
 
-         $this->contexts = array();
 
-         $this->finishedEtags = array("etags" => array(), "uploadId" => "", "expiredAt" => 0, "uploaded" => 0);
 
-         $this->config = $config;
 
-         $this->resumeRecordFile = $resumeRecordFile ? $resumeRecordFile : null;
 
-         $this->partSize = $partSize ? $partSize : config::BLOCK_SIZE;
 
-         if ($reqOpt === null) {
 
-             $reqOpt = new RequestOptions();
 
-         }
 
-         $this->reqOpt = $reqOpt;
 
-         try {
 
-             $this->version = SplitUploadVersion::from($version ? $version : 'v1');
 
-         } catch (\Exception $e) {
 
-             throw new \Exception("only support v1/v2 now!", 0, $e);
 
-         }
 
-         list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($upToken);
 
-         $this->bucket = $bucket;
 
-         if ($err != null) {
 
-             return array(null, $err);
 
-         }
 
-         list($upHost, $err) = $config->getUpHostV2($accessKey, $bucket, $reqOpt);
 
-         if ($err != null) {
 
-             throw new \Exception($err->message(), 1);
 
-         }
 
-         $this->host = $upHost;
 
-     }
 
-     /**
 
-      * 上传操作
 
-      * @param $fname string 文件名
 
-      *
 
-      * @throws \Exception
 
-      */
 
-     public function upload($fname)
 
-     {
 
-         $blkputRets = null;
 
-         // get upload record from resumeRecordFile
 
-         if ($this->resumeRecordFile != null) {
 
-             if (file_exists($this->resumeRecordFile)) {
 
-                 $stream = fopen($this->resumeRecordFile, 'r');
 
-                 if ($stream) {
 
-                     $streamLen = filesize($this->resumeRecordFile);
 
-                     if ($streamLen > 0) {
 
-                         $contents = fread($stream, $streamLen);
 
-                         fclose($stream);
 
-                         if ($contents) {
 
-                             $blkputRets = json_decode($contents, true);
 
-                             if ($blkputRets === null) {
 
-                                 error_log("resumeFile contents decode error");
 
-                             }
 
-                         } else {
 
-                             error_log("read resumeFile failed");
 
-                         }
 
-                     } else {
 
-                         error_log("resumeFile is empty");
 
-                     }
 
-                 } else {
 
-                     error_log("resumeFile open failed");
 
-                 }
 
-             } else {
 
-                 error_log("resumeFile not exists");
 
-             }
 
-         }
 
-         if ($this->version == SplitUploadVersion::V1) {
 
-             return $this->uploadV1($fname, $blkputRets);
 
-         } elseif ($this->version == SplitUploadVersion::V2) {
 
-             return $this->uploadV2($fname, $blkputRets);
 
-         } else {
 
-             throw new \Exception("only support v1/v2 now!");
 
-         }
 
-     }
 
-     /**
 
-      * @param string $fname 文件名
 
-      * @param null|array $blkputRets
 
-      *
 
-      * @throws \Exception
 
-      */
 
-     private function uploadV1($fname, $blkputRets = null)
 
-     {
 
-         // 尝试恢复恢复已上传的数据
 
-         $isResumeUpload = $blkputRets !== null;
 
-         $this->contexts = array();
 
-         if ($blkputRets) {
 
-             if (isset($blkputRets['contexts']) && isset($blkputRets['uploaded']) &&
 
-                 is_array($blkputRets['contexts']) && is_int($blkputRets['uploaded'])
 
-             ) {
 
-                 $this->contexts = array_map(function ($ctx) {
 
-                     if (is_array($ctx)) {
 
-                         return $ctx;
 
-                     } else {
 
-                         // 兼容旧版本(旧版本没有存储 expireAt)
 
-                         return array(
 
-                             "ctx" => $ctx,
 
-                             "expiredAt" => 0,
 
-                         );
 
-                     }
 
-                 }, $blkputRets['contexts']);
 
-             }
 
-         }
 
-         // 上传分片
 
-         $uploaded = 0;
 
-         while ($uploaded < $this->size) {
 
-             $blockSize = $this->blockSize($uploaded);
 
-             $blockIndex = $uploaded / $this->partSize;
 
-             if (!is_int($blockIndex)) {
 
-                 throw new \Exception("v1 part size changed");
 
-             }
 
-             // 如果已上传该分片且没有过期
 
-             if (isset($this->contexts[$blockIndex]) && $this->contexts[$blockIndex]["expiredAt"] >= time()) {
 
-                 $uploaded += $blockSize;
 
-                 fseek($this->inputStream, $blockSize, SEEK_CUR);
 
-                 continue;
 
-             }
 
-             $data = fread($this->inputStream, $blockSize);
 
-             if ($data === false) {
 
-                 throw new \Exception("file read failed", 1);
 
-             }
 
-             $crc = \Qiniu\crc32_data($data);
 
-             $response = $this->makeBlock($data, $blockSize);
 
-             $ret = null;
 
-             if ($response->ok() && $response->json() != null) {
 
-                 $ret = $response->json();
 
-             }
 
-             if ($response->statusCode < 0) {
 
-                 list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($this->upToken);
 
-                 if ($err != null) {
 
-                     return array(null, $err);
 
-                 }
 
-                 list($upHostBackup, $err) = $this->config->getUpBackupHostV2($accessKey, $bucket, $this->reqOpt);
 
-                 if ($err != null) {
 
-                     return array(null, $err);
 
-                 }
 
-                 $this->host = $upHostBackup;
 
-             }
 
-             if ($response->needRetry() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
 
-                 $response = $this->makeBlock($data, $blockSize);
 
-                 $ret = $response->json();
 
-             }
 
-             if (!$response->ok() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
 
-                 return array(null, new Error($this->currentUrl, $response));
 
-             }
 
-             // 如果可以在已上传取到说明是过期分片直接修改已上传信息,否则是新的片添加到已上传分片尾部
 
-             if (isset($this->contexts[$blockIndex])) {
 
-                 $this->contexts[$blockIndex] = array(
 
-                     'ctx' => $ret['ctx'],
 
-                     'expiredAt' => $ret['expired_at'],
 
-                 );
 
-             } else {
 
-                 array_push($this->contexts, array(
 
-                     'ctx' => $ret['ctx'],
 
-                     'expiredAt' => $ret['expired_at'],
 
-                 ));
 
-             }
 
-             $uploaded += $blockSize;
 
-             // 记录断点
 
-             if ($this->resumeRecordFile !== null) {
 
-                 $recordData = array(
 
-                     'contexts' => $this->contexts,
 
-                     'uploaded' => $uploaded
 
-                 );
 
-                 $recordData = json_encode($recordData);
 
-                 if ($recordData) {
 
-                     $isWritten = file_put_contents($this->resumeRecordFile, $recordData);
 
-                     if ($isWritten === false) {
 
-                         error_log("write resumeRecordFile failed");
 
-                     }
 
-                 } else {
 
-                     error_log('resumeRecordData encode failed');
 
-                 }
 
-             }
 
-         }
 
-         // 完成上传
 
-         list($ret, $err) = $this->makeFile($fname);
 
-         if ($err !== null) {
 
-             $response = $err->getResponse();
 
-             if ($isResumeUpload && $response->statusCode === 701) {
 
-                 fseek($this->inputStream, 0);
 
-                 return $this->uploadV1($fname);
 
-             }
 
-         }
 
-         return array($ret, $err);
 
-     }
 
-     /**
 
-      * @param string $fname 文件名
 
-      * @param null|array $blkputRets
 
-      *
 
-      * @throws \Exception
 
-      */
 
-     private function uploadV2($fname, $blkputRets = null)
 
-     {
 
-         $uploaded = 0;
 
-         $partNumber = 1;
 
-         $encodedObjectName = $this->key ? \Qiniu\base64_urlSafeEncode($this->key) : '~';
 
-         $isResumeUpload = $blkputRets !== null;
 
-         // 初始化 upload id
 
-         $err = null;
 
-         if ($blkputRets) {
 
-             if (isset($blkputRets["etags"]) && isset($blkputRets["uploadId"]) &&
 
-                 isset($blkputRets["expiredAt"]) && $blkputRets["expiredAt"] > time() &&
 
-                 $blkputRets["uploaded"] > 0 && is_array($blkputRets["etags"]) &&
 
-                 is_string($blkputRets["uploadId"]) && is_int($blkputRets["expiredAt"])
 
-             ) {
 
-                 $this->finishedEtags['etags'] = $blkputRets["etags"];
 
-                 $this->finishedEtags["uploadId"] = $blkputRets["uploadId"];
 
-                 $this->finishedEtags["expiredAt"] = $blkputRets["expiredAt"];
 
-                 $this->finishedEtags["uploaded"] = $blkputRets["uploaded"];
 
-                 $uploaded = $blkputRets["uploaded"];
 
-                 $partNumber = count($this->finishedEtags["etags"]) + 1;
 
-             } else {
 
-                 $err = $this->makeInitReq($encodedObjectName);
 
-             }
 
-         } else {
 
-             $err = $this->makeInitReq($encodedObjectName);
 
-         }
 
-         if ($err != null) {
 
-             return array(null, $err);
 
-         }
 
-         // 上传分片
 
-         fseek($this->inputStream, $uploaded);
 
-         while ($uploaded < $this->size) {
 
-             $blockSize = $this->blockSize($uploaded);
 
-             $data = fread($this->inputStream, $blockSize);
 
-             if ($data === false) {
 
-                 throw new \Exception("file read failed", 1);
 
-             }
 
-             $md5 = md5($data);
 
-             $response = $this->uploadPart(
 
-                 $data,
 
-                 $partNumber,
 
-                 $this->finishedEtags["uploadId"],
 
-                 $encodedObjectName,
 
-                 $md5
 
-             );
 
-             $ret = null;
 
-             if ($response->ok() && $response->json() != null) {
 
-                 $ret = $response->json();
 
-             }
 
-             if ($response->statusCode < 0) {
 
-                 list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($this->upToken);
 
-                 if ($err != null) {
 
-                     return array(null, $err);
 
-                 }
 
-                 list($upHostBackup, $err) = $this->config->getUpBackupHostV2($accessKey, $bucket, $this->reqOpt);
 
-                 if ($err != null) {
 
-                     return array(null, $err);
 
-                 }
 
-                 $this->host = $upHostBackup;
 
-             }
 
-             if ($response->needRetry() || !isset($ret['md5']) || $md5 != $ret['md5']) {
 
-                 $response = $this->uploadPart(
 
-                     $data,
 
-                     $partNumber,
 
-                     $this->finishedEtags["uploadId"],
 
-                     $encodedObjectName,
 
-                     $md5
 
-                 );
 
-                 $ret = $response->json();
 
-             }
 
-             if ($isResumeUpload && $response->statusCode === 612) {
 
-                 return $this->uploadV2($fname);
 
-             }
 
-             if (!$response->ok() || !isset($ret['md5']) || $md5 != $ret['md5']) {
 
-                 return array(null, new Error($this->currentUrl, $response));
 
-             }
 
-             $blockStatus = array('etag' => $ret['etag'], 'partNumber' => $partNumber);
 
-             array_push($this->finishedEtags['etags'], $blockStatus);
 
-             $partNumber += 1;
 
-             $uploaded += $blockSize;
 
-             $this->finishedEtags['uploaded'] = $uploaded;
 
-             if ($this->resumeRecordFile !== null) {
 
-                 $recordData = json_encode($this->finishedEtags);
 
-                 if ($recordData) {
 
-                     $isWritten = file_put_contents($this->resumeRecordFile, $recordData);
 
-                     if ($isWritten === false) {
 
-                         error_log("write resumeRecordFile failed");
 
-                     }
 
-                 } else {
 
-                     error_log('resumeRecordData encode failed');
 
-                 }
 
-             }
 
-         }
 
-         list($ret, $err) = $this->completeParts($fname, $this->finishedEtags['uploadId'], $encodedObjectName);
 
-         if ($err !== null) {
 
-             $response = $err->getResponse();
 
-             if ($isResumeUpload && $response->statusCode === 612) {
 
-                 return $this->uploadV2($fname);
 
-             }
 
-         }
 
-         return array($ret, $err);
 
-     }
 
-     /**
 
-      * 创建块
 
-      */
 
-     private function makeBlock($block, $blockSize)
 
-     {
 
-         $url = $this->host . '/mkblk/' . $blockSize;
 
-         return $this->post($url, $block);
 
-     }
 
-     private function fileUrl($fname)
 
-     {
 
-         $url = $this->host . '/mkfile/' . $this->size;
 
-         $url .= '/mimeType/' . \Qiniu\base64_urlSafeEncode($this->mime);
 
-         if ($this->key != null) {
 
-             $url .= '/key/' . \Qiniu\base64_urlSafeEncode($this->key);
 
-         }
 
-         $url .= '/fname/' . \Qiniu\base64_urlSafeEncode($fname);
 
-         if (!empty($this->params)) {
 
-             foreach ($this->params as $key => $value) {
 
-                 $val = \Qiniu\base64_urlSafeEncode($value);
 
-                 $url .= "/$key/$val";
 
-             }
 
-         }
 
-         return $url;
 
-     }
 
-     /**
 
-      * 创建文件
 
-      *
 
-      * @param string $fname 文件名
 
-      * @return array{array | null, Error | null}
 
-      */
 
-     private function makeFile($fname)
 
-     {
 
-         $url = $this->fileUrl($fname);
 
-         $body = implode(',', array_map(function ($ctx) {
 
-             return $ctx['ctx'];
 
-         }, $this->contexts));
 
-         $response = $this->post($url, $body);
 
-         if ($response->needRetry()) {
 
-             $response = $this->post($url, $body);
 
-         }
 
-         if ($response->statusCode === 200 || $response->statusCode === 701) {
 
-             if ($this->resumeRecordFile !== null) {
 
-                 @unlink($this->resumeRecordFile);
 
-             }
 
-         }
 
-         if (!$response->ok()) {
 
-             return array(null, new Error($this->currentUrl, $response));
 
-         }
 
-         return array($response->json(), null);
 
-     }
 
-     private function post($url, $data)
 
-     {
 
-         $this->currentUrl = $url;
 
-         $headers = array('Authorization' => 'UpToken ' . $this->upToken);
 
-         return Client::post($url, $data, $headers, $this->reqOpt);
 
-     }
 
-     private function blockSize($uploaded)
 
-     {
 
-         if ($this->size < $uploaded + $this->partSize) {
 
-             return $this->size - $uploaded;
 
-         }
 
-         return $this->partSize;
 
-     }
 
-     private function makeInitReq($encodedObjectName)
 
-     {
 
-         list($ret, $err) = $this->initReq($encodedObjectName);
 
-         if ($ret == null) {
 
-             return $err;
 
-         }
 
-         $this->finishedEtags["uploadId"] = $ret['uploadId'];
 
-         $this->finishedEtags["expiredAt"] = $ret['expireAt'];
 
-         return $err;
 
-     }
 
-     /**
 
-      * 初始化上传任务
 
-      */
 
-     private function initReq($encodedObjectName)
 
-     {
 
-         $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName . '/uploads';
 
-         $headers = array(
 
-             'Authorization' => 'UpToken ' . $this->upToken,
 
-             'Content-Type' => 'application/json'
 
-         );
 
-         $response = $this->postWithHeaders($url, null, $headers);
 
-         $ret = $response->json();
 
-         if ($response->ok() && $ret != null) {
 
-             return array($ret, null);
 
-         }
 
-         return array(null, new Error($url, $response));
 
-     }
 
-     /**
 
-      * 分块上传v2
 
-      */
 
-     private function uploadPart($block, $partNumber, $uploadId, $encodedObjectName, $md5)
 
-     {
 
-         $headers = array(
 
-             'Authorization' => 'UpToken ' . $this->upToken,
 
-             'Content-Type' => 'application/octet-stream',
 
-             'Content-MD5' => $md5
 
-         );
 
-         $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName .
 
-             '/uploads/' . $uploadId . '/' . $partNumber;
 
-         $response = $this->put($url, $block, $headers);
 
-         if ($response->statusCode === 612) {
 
-             if ($this->resumeRecordFile !== null) {
 
-                 @unlink($this->resumeRecordFile);
 
-             }
 
-         }
 
-         return $response;
 
-     }
 
-     /**
 
-      * 完成分片上传V2
 
-      *
 
-      * @param string $fname 文件名
 
-      * @param int $uploadId 由 {@see initReq} 获取
 
-      * @param string $encodedObjectName 经过编码的存储路径
 
-      * @return array{array | null, Error | null}
 
-      */
 
-     private function completeParts($fname, $uploadId, $encodedObjectName)
 
-     {
 
-         $headers = array(
 
-             'Authorization' => 'UpToken ' . $this->upToken,
 
-             'Content-Type' => 'application/json'
 
-         );
 
-         $etags = $this->finishedEtags['etags'];
 
-         $sortedEtags = \Qiniu\arraySort($etags, 'partNumber');
 
-         $metadata = array();
 
-         $customVars = array();
 
-         if ($this->params) {
 
-             foreach ($this->params as $k => $v) {
 
-                 if (strpos($k, 'x:') === 0) {
 
-                     $customVars[$k] = $v;
 
-                 } elseif (strpos($k, 'x-qn-meta-') === 0) {
 
-                     $metadata[$k] = $v;
 
-                 }
 
-             }
 
-         }
 
-         if (empty($metadata)) {
 
-             $metadata = null;
 
-         }
 
-         if (empty($customVars)) {
 
-             $customVars = null;
 
-         }
 
-         $body = array(
 
-             'fname' => $fname,
 
-             'mimeType' => $this->mime,
 
-             'metadata' => $metadata,
 
-             'customVars' => $customVars,
 
-             'parts' => $sortedEtags
 
-         );
 
-         $jsonBody = json_encode($body);
 
-         $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName . '/uploads/' . $uploadId;
 
-         $response = $this->postWithHeaders($url, $jsonBody, $headers);
 
-         if ($response->needRetry()) {
 
-             $response = $this->postWithHeaders($url, $jsonBody, $headers);
 
-         }
 
-         if ($response->statusCode === 200 || $response->statusCode === 612) {
 
-             if ($this->resumeRecordFile !== null) {
 
-                 @unlink($this->resumeRecordFile);
 
-             }
 
-         }
 
-         if (!$response->ok()) {
 
-             return array(null, new Error($this->currentUrl, $response));
 
-         }
 
-         return array($response->json(), null);
 
-     }
 
-     private function put($url, $data, $headers)
 
-     {
 
-         $this->currentUrl = $url;
 
-         return Client::put($url, $data, $headers, $this->reqOpt);
 
-     }
 
-     private function postWithHeaders($url, $data, $headers)
 
-     {
 
-         $this->currentUrl = $url;
 
-         return Client::post($url, $data, $headers, $this->reqOpt);
 
-     }
 
- }
 
 
  |