如何使用Laravel Queue在S3上拦截新文件?

我有一个S3存储桶,mybucket并且想要在将新文件复制到该存储桶中时执行某些操作。对于通知,我想使用SQS队列notifiqueue,因为我的目标是使用Laravel


由于我在中创建了基础架构CloudFormation,因此资源的创建是这样的:


NotificationQueue:

  Type: AWS::SQS::Queue

  Properties:

    VisibilityTimeout: 120

    QueueName: 'NotificationQueue'


DataGateBucket:

  Type: AWS::S3::Bucket

  Properties:

    AccessControl: BucketOwnerFullControl

    BucketName: 'mybucket'

    NotificationConfiguration:

      QueueConfigurations:

        - Event: 's3:ObjectCreated:*'

          Queue: !GetAtt NotificationQueue.Arn

每次将新文件保存在存储桶中时,S3都会在SQS中自动创建一个通知。


可悲的是,有效负载的格式与Laravel标准作业有效负载不兼容,并且如果我在上运行辅助进程,则会NotificationQueue收到此错误:


local.ERROR: Undefined index: job {"exception":"[object] (ErrorException(code: 0): Undefined index: job at .../vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php:273)

为了提供更完整的指示,这是我在通知中得到的内容(将JSON转换为PHP数组之后)


array:1 [

  "Records" => array:1 [

    0 => array:9 [

      "eventVersion" => "2.1"

      "eventSource" => "aws:s3"

      "awsRegion" => "eu-central-1"

      "eventTime" => "2019-04-23T17:02:41.308Z"

      "eventName" => "ObjectCreated:Put"

      "userIdentity" => array:1 [

        "principalId" => "AWS:XXXXXXXXXXXXXXXXXX"

      ]

      "requestParameters" => array:1 [

        "sourceIPAddress" => "217.64.198.7"

      ]

      "responseElements" => array:2 [

        "x-amz-request-id" => "602CE18B8DE0BE5C"

        "x-amz-id-2" => "wA/A3Jl2XpoxBWJEgQzy11s6O28Cz9Wc6pVi6Ho1vnIrOjqsWkGozlUmqRdpYAfub0MqdF8d/YI="

      ]

      "s3" => array:4 [

        "s3SchemaVersion" => "1.0"

        "configurationId" => "0d4eaa75-5730-495e-b6d4-368bf3690f30"

        "bucket" => array:3 [

          "name" => "mybucket"

          "ownerIdentity" => array:1 [

            "principalId" => "XXXXXXXXXXXXXXXXXX"

          ]


使用Laravel访问通知的最有效/正确/正确的方法是哪种,以便我可以触发其他选项来响应文件上传?


九州编程
浏览 135回答 1
1回答

米琪卡哇伊

我找到了一种获取所需行为的方法,但是我不确定这是否是最佳方法,因此我将其发布在这里,也许可以给我反馈。当我们谈论Laravel Queues时,很多配置来自app.php,特别是来自本Provider节。我设法添加了我需要覆盖原始QueueServiceProvider类的行为并替换了它:// Here is the original Provider Class//Illuminate\Queue\QueueServiceProvider::class,// Here is the overridden Provider\App\Providers\QueueServiceProvider::class,&nbsp;新QueueServiceProvider类如下:<?phpnamespace App\Providers;use App\Jobs\SqsNotifications\SqsConnector;class QueueServiceProvider extends \Illuminate\Queue\QueueServiceProvider{&nbsp; &nbsp; /**&nbsp; &nbsp; &nbsp;* Register the Amazon SQS queue connector.&nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; &nbsp;* @param&nbsp; \Illuminate\Queue\QueueManager&nbsp; $manager&nbsp; &nbsp; &nbsp;* @return void&nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; protected function registerSqsNotifConnector($manager)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; $manager->addConnector('sqsNotif', function () {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new SqsConnector();&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; }&nbsp; &nbsp; public function registerConnectors($manager){&nbsp; &nbsp; &nbsp; &nbsp; parent::registerConnectors($manager);&nbsp; &nbsp; &nbsp; &nbsp; // Add the custom SQS notification connector&nbsp; &nbsp; &nbsp; &nbsp; $this->registerSqsNotifConnector($manager);&nbsp; &nbsp; }}请注意sqsNotif,需要将新的连接器添加到queue.php&nbsp;'sqsNotif' => [&nbsp; &nbsp; &nbsp; &nbsp; 'driver' => 'sqsNotif',&nbsp; &nbsp; &nbsp; &nbsp; 'key' => env('AWS_ACCESS_KEY_ID'),&nbsp; &nbsp; &nbsp; &nbsp; 'secret' => env('AWS_SECRET_ACCESS_KEY'),&nbsp; &nbsp; &nbsp; &nbsp; 'prefix' => env('SQS_PREFIX', 'https://sqs.eu-central-1.amazonaws.com/your-account'),&nbsp; &nbsp; &nbsp; &nbsp; 'queue' => env('SQS_QUEUE', 'your-queue-name'),&nbsp; &nbsp; &nbsp; &nbsp; 'region' => env('AWS_DEFAULT_REGION', 'eu-central-1'),],在新版本中,QueueServiceProvider我们仅注册了一个额外的连接器,其代码为:<?phpnamespace App\Jobs\SqsNotifications;use Aws\Sqs\SqsClient;use Illuminate\Support\Arr;class SqsConnector extends \Illuminate\Queue\Connectors\SqsConnector{&nbsp; &nbsp; /**&nbsp; &nbsp; &nbsp;* Establish a queue connection.&nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; &nbsp;* @param&nbsp; array&nbsp; $config&nbsp; &nbsp; &nbsp;* @return \Illuminate\Contracts\Queue\Queue&nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; public function connect(array $config)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;$config = $this->getDefaultConfiguration($config);&nbsp; &nbsp; &nbsp; &nbsp; if ($config['key'] && $config['secret']) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return new SqsQueue(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new SqsClient($config), $config['queue'], $config['prefix'] ?? ''&nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; }}SqsQueue也以这种方式重新定义:<?phpnamespace App\Jobs\SqsNotifications;class SqsQueue extends \Illuminate\Queue\SqsQueue{&nbsp; &nbsp;/**&nbsp; &nbsp; * Pop the next job off of the queue.&nbsp; &nbsp; *&nbsp; &nbsp; * @param&nbsp; string&nbsp; $queue&nbsp; &nbsp; * @return \Illuminate\Contracts\Queue\Job|null&nbsp; &nbsp; */&nbsp; &nbsp; public function pop($queue = null)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; $response = $this->sqs->receiveMessage([&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'QueueUrl' => $queue = $this->getQueue($queue),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'AttributeNames' => ['ApproximateReceiveCount'],&nbsp; &nbsp; &nbsp; &nbsp; ]);&nbsp; &nbsp; &nbsp; &nbsp; if (! is_null($response['Messages']) && count($response['Messages']) > 0) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new SqsJob(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $this->container, $this->sqs, $response['Messages'][0],&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $this->connectionName, $queue&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}最后缺少的部分是SqsJob,其定义如下:<?phpnamespace App\Jobs\SqsNotifications;use Illuminate\Queue\Jobs\JobName;/**&nbsp;* Class SqsJob&nbsp;* @package App\Jobs\SqsNotifications&nbsp;*&nbsp;* Alternate SQS job that is used in case of S3 notifications&nbsp;*/class SqsJob extends \Illuminate\Queue\Jobs\SqsJob{&nbsp; &nbsp; /**&nbsp; &nbsp; &nbsp;* Get the name of the queued job class.&nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; &nbsp;* @return string&nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; public function getName()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; $bucketName = '';&nbsp; &nbsp; &nbsp; &nbsp; // Define the name of the Process based on the bucket name&nbsp; &nbsp; &nbsp; &nbsp; switch($this->payload()['Records'][0]['s3']['bucket']['name']){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case 'mybucket':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $bucketName = 'NewMyBucketFileJob';&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return $bucketName;&nbsp; &nbsp; }&nbsp; &nbsp;/**&nbsp; &nbsp; * Fire the job.&nbsp; &nbsp; *&nbsp; &nbsp; * @return void&nbsp; &nbsp; */&nbsp; &nbsp; public function fire()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; // Mimic the original behavior with a different payload&nbsp; &nbsp; &nbsp; &nbsp; $payload = $this->payload();&nbsp; &nbsp; &nbsp; &nbsp; [$class, $method] = JobName::parse('\App\Jobs\\' . $this->getName() . '@handle');&nbsp; &nbsp; &nbsp; &nbsp; ($this->instance = $this->resolve($class))->{$method}($payload);&nbsp; &nbsp; &nbsp; &nbsp; // The Job wasn't automatically deleted, so we need to delete it manually once the process went fine&nbsp; &nbsp; &nbsp; &nbsp; $this->delete();&nbsp; &nbsp; }}在这一点上,我只需要在a中定义处理Job,例如下面的代码NewMyBucketFileJob:<?phpnamespace App\Jobs;use Illuminate\Bus\Queueable;use Illuminate\Queue\SerializesModels;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Bus\Dispatchable;class ProcessDataGateNewFile implements ShouldQueue{&nbsp; &nbsp; use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;&nbsp; &nbsp; /**&nbsp; &nbsp; &nbsp;* Create a new job instance.&nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; &nbsp;* @return void&nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; public function __construct()&nbsp; &nbsp; {&nbsp; &nbsp; }&nbsp; &nbsp; /**&nbsp; &nbsp; &nbsp;* Execute the job.&nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; &nbsp;* @return void&nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; public function handle($data)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; // Print the whole data structure&nbsp; &nbsp; &nbsp; &nbsp; print_r($data);&nbsp; &nbsp; &nbsp; &nbsp; // Or just the name of the uploaded file&nbsp; &nbsp; &nbsp; &nbsp; print_r($data['Records'][0]['s3']['object']['key']);&nbsp; &nbsp; }}这个过程是可行的,所以这是一个解决方案,但是涉及许多类扩展,并且在将来的版本中内部队列实现将被更改时,它非常脆弱。老实说,我想知道是否有更简单或更强大的方法
打开App,查看更多内容
随时随地看视频慕课网APP